Skip to content

Commit

Permalink
Add S3MultipartETag class with version control (linkedin#2739)
Browse files Browse the repository at this point in the history
 Add S3MultipartETag class with version control.
  • Loading branch information
JingQianCloud authored Mar 27, 2024
1 parent 4a4ee68 commit 2ec5676
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.github.ambry.router;
package com.github.ambry.frontend;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.ambry.utils.Pair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.json.JSONArray;
import org.json.JSONObject;
import java.util.Objects;


/**
Expand Down Expand Up @@ -96,24 +97,34 @@ public List<Pair<String, Long>> getOrderedChunkIdSizeList() {
}

/**
* Serialize a {@link PutBlobMetaInfo} object to a JSON string
* Serialize a {@link PutBlobMetaInfo} object to a JSON Object
* @param metaInfo the {@link PutBlobMetaInfo}
* @return the JSON string representation of the object
* @return the JSON object
*/
public static String serialize(PutBlobMetaInfo metaInfo) {
JSONObject rootObject = new JSONObject();

public static ObjectNode serializeToJsonObject(PutBlobMetaInfo metaInfo) {
ObjectNode rootObject = objectMapper.createObjectNode();
// The order of elements in JSON arrays is preserved.
// https://www.rfc-editor.org/rfc/rfc7159.html
// An object is an unordered collection of zero or more name/value pairs
// An array is an ordered sequence of zero or more values.
JSONArray chunks = new JSONArray();
ArrayNode chunks = objectMapper.createArrayNode();
for (Pair<String, Long> blobAndSize : metaInfo.orderedChunkIdSizeList) {
chunks.put(new JSONObject().put(BLOB, blobAndSize.getFirst()).put(SIZE, blobAndSize.getSecond()));
chunks.add(objectMapper.createObjectNode().put(BLOB, blobAndSize.getFirst()).put(SIZE, blobAndSize.getSecond()));
}
rootObject.put(CHUNKS, chunks);
rootObject.put(RESERVED_METADATA_CHUNK_ID, metaInfo.reservedMetadataChunkId);

return rootObject;
}

/**
* Serialize a {@link PutBlobMetaInfo} object to a JSON string
* @param metaInfo the {@link PutBlobMetaInfo}
* @return the JSON string representation of the object
*/
public static String serialize(PutBlobMetaInfo metaInfo) {
ObjectNode rootObject = serializeToJsonObject(metaInfo);

return rootObject.toString();
}

Expand Down Expand Up @@ -157,4 +168,19 @@ public static PutBlobMetaInfo deserialize(String metaInfo) throws IOException {
public String toString() {
return PutBlobMetaInfo.serialize(this);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

PutBlobMetaInfo other = (PutBlobMetaInfo) o;

return Objects.equals(reservedMetadataChunkId, other.reservedMetadataChunkId) && Objects.equals(
orderedChunkIdSizeList, other.orderedChunkIdSizeList);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
*/
package com.github.ambry.frontend.s3;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.ambry.frontend.PutBlobMetaInfo;
import com.github.ambry.utils.Pair;
import java.io.IOException;
import java.util.List;
import java.util.Objects;


/**
* This class holds ETag returned for the Multipart part upload.
* json example:
* {
* "dataChunkList": {
* "chunks": [
* {
* "blob": "AAYQ_3Z6AAoAAQAAAAAAAAAHyAWDw-xvSHWPD608xUQH3w",
* "size": 8024
* },
* {
* "blob": "AAYQ_3Z6ACWBADERGGGGGVVDASDEG-xv8xUQH3wAHyAWDw",
* "size": 4031
* }
* ],
* "reservedMetadataChunkId": null
* },
* "version": 1
* }
*/
public class S3MultipartETag {
static final short VERSION_1 = 1;
static short CURRENT_VERSION = VERSION_1;
private static final String VERSION = "version";
private static final String DATA_CHUNK_LIST = "dataChunkList";
private final List<Pair<String, Long>> orderedChunkIdSizeList;
private short version;

private static final ObjectMapper objectMapper = new ObjectMapper();

/**
* Construct S3MultipartETag
* @param orderedChunkIdSizeList data chunk id and size list in order
*/
public S3MultipartETag(List<Pair<String, Long>> orderedChunkIdSizeList) {
version = CURRENT_VERSION;
this.orderedChunkIdSizeList = orderedChunkIdSizeList;
}

/**
* Get the version number
* @return the version
*/
public short getVersion() {
return version;
}

/**
* Get the data chunk list in order
* @return the data chunk id/size list
*/
public List<Pair<String, Long>> getOrderedChunkIdSizeList() {
return orderedChunkIdSizeList;
}

/**
* Serialize the {@link S3MultipartETag} to Json string
* @return the serialized Json string
*/
public static String serialize(S3MultipartETag eTag) throws IOException {
ObjectNode rootObject = objectMapper.createObjectNode();

PutBlobMetaInfo metaInfo = new PutBlobMetaInfo(eTag.getOrderedChunkIdSizeList(), null);
ObjectNode chunks = PutBlobMetaInfo.serializeToJsonObject(metaInfo);
rootObject.put(DATA_CHUNK_LIST, chunks);
rootObject.put(VERSION, CURRENT_VERSION);

return rootObject.toString();
}

/**
* Deserialize the Json String to {@link S3MultipartETag}
* @return the {@link S3MultipartETag}
*/
public static S3MultipartETag deserialize(String eTagStr) throws IOException {
JsonNode rootNode;
try {
rootNode = objectMapper.readTree(eTagStr);
} catch (JsonProcessingException e) {
throw new IOException("Not expected JSON content " + eTagStr + e.getMessage());
}

// version id
JsonNode jsonNode = rootNode.get(VERSION);
short version = jsonNode.shortValue();
if (version != VERSION_1) {
throw new IOException("Wrong version number " + eTagStr);
}

// data chunk list
jsonNode = rootNode.get(DATA_CHUNK_LIST);
PutBlobMetaInfo putBlobMetaInfo = PutBlobMetaInfo.deserialize(jsonNode.toString());
return new S3MultipartETag(putBlobMetaInfo.getOrderedChunkIdSizeList());
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

S3MultipartETag other = (S3MultipartETag) o;

return version == other.version && Objects.equals(orderedChunkIdSizeList, other.orderedChunkIdSizeList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import com.github.ambry.rest.RestServiceException;
import com.github.ambry.rest.RestUtils;
import com.github.ambry.router.ChunkInfo;
import com.github.ambry.router.PutBlobMetaInfo;
import com.github.ambry.router.PutBlobOptions;
import com.github.ambry.router.PutBlobOptionsBuilder;
import com.github.ambry.router.ReadableStreamChannel;
Expand Down Expand Up @@ -364,30 +363,28 @@ List<ChunkInfo> getChunksToStitch(CompleteMultipartUpload completeMultipartUploa
Collections.sort(sortedParts, Comparator.comparingInt(Part::getPartNumber));
String reservedMetadataId = null;
for (Part part : sortedParts) {
PutBlobMetaInfo putBlobMetaInfoObj = PutBlobMetaInfo.deserialize(part.geteTag());
// reservedMetadataId can be null. but if it's not null, they are supposed to be the same
String reserved = putBlobMetaInfoObj.getReservedMetadataChunkId();
if (reservedMetadataId == null) {
reservedMetadataId = reserved;
}
if (reserved != null && !reserved.equals(reservedMetadataId)) {
String error = "Reserved ID are different " + completeMultipartUpload;
throw new RestServiceException(error, RestServiceErrorCode.BadRequest);
}
S3MultipartETag eTag = S3MultipartETag.deserialize(part.geteTag());
// TODO [S3]: decide the life cycle of S3.
long expirationTimeInMs = -1;

List<Pair<String, Long>> chunks = putBlobMetaInfoObj.getOrderedChunkIdSizeList();
for (int i = 0; i < putBlobMetaInfoObj.getNumChunks(); i++) {
String blobId = chunks.get(i).getFirst();
long chunkSize = chunks.get(i).getSecond();
if (eTag.getVersion() == S3MultipartETag.VERSION_1) {
List<Pair<String, Long>> chunks = eTag.getOrderedChunkIdSizeList();
for (int i = 0; i < chunks.size(); i++) {
String blobId = chunks.get(i).getFirst();
long chunkSize = chunks.get(i).getSecond();

ChunkInfo chunk = new ChunkInfo(blobId, chunkSize, expirationTimeInMs, reservedMetadataId);
chunkInfos.add(chunk);
ChunkInfo chunk = new ChunkInfo(blobId, chunkSize, expirationTimeInMs, reservedMetadataId);
chunkInfos.add(chunk);
}
} else {
String error = "Wrong ETag version " + completeMultipartUpload + " rest request " + restRequest;
LOGGER.error(error);
throw new RestServiceException(error, RestServiceErrorCode.BadRequest);
}
}
} catch (IOException e) {
String error = "Could not parse xml request body " + completeMultipartUpload;
String error = "Could not parse xml request body " + completeMultipartUpload + " rest request " + restRequest;
LOGGER.error(error);
throw new RestServiceException(error, e, RestServiceErrorCode.BadRequest);
}
return chunkInfos;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
import com.github.ambry.rest.RestResponseChannel;
import com.github.ambry.rest.RestServiceException;
import com.github.ambry.rest.RestUtils;
import com.github.ambry.frontend.PutBlobMetaInfo;
import com.github.ambry.router.PutBlobOptions;
import com.github.ambry.router.PutBlobOptionsBuilder;
import com.github.ambry.router.ReadableStreamChannel;
import com.github.ambry.router.Router;
import com.github.ambry.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
Expand Down Expand Up @@ -178,7 +178,10 @@ private Callback<String> routerPutBlobCallback(BlobInfo blobInfo) {
// TODO [S3] Make changes to sign ETags. Currently they are sent as shown below.
// ETag: {"chunks":[{"blob":"AAYQAQBlAAgAAQAAAAAAAAAAw6UGCoNgS8KGgV-SGXAMdQ","size":4194304},
// {"blob":"AAYQAQBlAAgAAQAAAAAAAAAA6Jaga4MiRFiBXi_jjaiQhg","size":1052262}]}
restResponseChannel.setHeader(RestUtils.Headers.LOCATION, blobId);
PutBlobMetaInfo putBlobMetaInfo = PutBlobMetaInfo.deserialize(blobId);
S3MultipartETag etag = new S3MultipartETag(putBlobMetaInfo.getOrderedChunkIdSizeList());
String eTagStr = S3MultipartETag.serialize(etag);
restResponseChannel.setHeader(RestUtils.Headers.LOCATION, eTagStr);
securityService.processResponse(restRequest, restResponseChannel, blobInfo, securityProcessResponseCallback());
}, uri, logger, finalCallback);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.github.ambry.commons.ByteBufferAsyncWritableChannel;
import com.github.ambry.commons.Callback;
import com.github.ambry.config.RouterConfig;
import com.github.ambry.frontend.PutBlobMetaInfo;
import com.github.ambry.frontend.ReservedMetadataIdMetrics;
import com.github.ambry.messageformat.BlobProperties;
import com.github.ambry.messageformat.BlobType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.github.ambry.commons.BlobId;
import com.github.ambry.commons.ByteBufferReadableStreamChannel;
import com.github.ambry.commons.RetainingAsyncWritableChannel;
import com.github.ambry.frontend.PutBlobMetaInfo;
import com.github.ambry.frontend.s3.S3MultipartETag;
import com.github.ambry.messageformat.BlobProperties;
import com.github.ambry.messageformat.MessageFormatRecord;
import com.github.ambry.rest.MockRestRequest;
Expand All @@ -39,6 +41,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.json.JSONException;
import org.json.JSONObject;
Expand All @@ -59,6 +62,8 @@ public class S3NonBlockingRouterTest extends NonBlockingRouterTestBase {
private static final String CLUSTER_NAME = "ambry-test";
private static final String S3_PREFIX = "/s3";
private static final String SLASH = "/";
private static final Random random = new Random();

private String accountName = "myAccount";
private String containerName = "container-a";
private String blobName = "MyDirectory/MyKey";
Expand Down Expand Up @@ -237,6 +242,27 @@ public void testS3MultiPartUpload() throws Exception {
router.getBlob(blobId, new GetBlobOptionsBuilder().build()).get();
}

@Test
public void testS3MultipartETag() throws Exception {
List<Pair<String, Long>> orgList = new ArrayList<>();
int chunkCount = 100;
for (int i = 0; i < chunkCount; i++) {
orgList.add(new Pair<>(String.valueOf(random.nextLong()), random.nextLong()));
}

S3MultipartETag orgETag = new S3MultipartETag(orgList);

// Serialize and Deserialize, the S3MultipartETag should be the same
String eTagstr = S3MultipartETag.serialize(orgETag);
S3MultipartETag deserializedETag = S3MultipartETag.deserialize(eTagstr);
Assert.assertEquals(orgETag, deserializedETag);
List<Pair<String, Long>> deserializedList = deserializedETag.getOrderedChunkIdSizeList();
for (int i = 0; i < chunkCount; i++) {
Assert.assertEquals(orgList.get(i).getFirst(), deserializedList.get(i).getFirst());
Assert.assertEquals(orgList.get(i).getSecond(), deserializedList.get(i).getSecond());
}
}

private RestRequest createRestRequestForPutOperation()
throws JSONException, UnsupportedEncodingException, URISyntaxException {
String uri = S3_PREFIX + SLASH + accountName + SLASH + containerName + SLASH + blobName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.github.ambry.commons.CommonTestUtils;
import com.github.ambry.config.RouterConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.frontend.PutBlobMetaInfo;
import com.github.ambry.messageformat.BlobInfo;
import com.github.ambry.messageformat.BlobProperties;
import com.github.ambry.notification.NotificationBlobType;
Expand Down

0 comments on commit 2ec5676

Please sign in to comment.