Skip to content

Commit

Permalink
feat: introduce CRC32 checksums to storage:uploadData API (#13649)
Browse files Browse the repository at this point in the history
Co-authored-by: Donny Wu <[email protected]>
  • Loading branch information
eppjame and wuuxigh authored Aug 1, 2024
1 parent 30404f4 commit a06c2f9
Show file tree
Hide file tree
Showing 21 changed files with 584 additions and 44 deletions.
4 changes: 2 additions & 2 deletions packages/aws-amplify/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@
"name": "[Analytics] record (Kinesis)",
"path": "./dist/esm/analytics/kinesis/index.mjs",
"import": "{ record }",
"limit": "48.69 kB"
"limit": "48.8 kB"
},
{
"name": "[Analytics] record (Kinesis Firehose)",
Expand Down Expand Up @@ -497,7 +497,7 @@
"name": "[Storage] uploadData (S3)",
"path": "./dist/esm/storage/index.mjs",
"import": "{ uploadData }",
"limit": "20.48 kB"
"limit": "21.63 kB"
}
]
}
4 changes: 2 additions & 2 deletions packages/interactions/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@
"name": "Interactions (default to Lex v2)",
"path": "./dist/esm/index.mjs",
"import": "{ Interactions }",
"limit": "52.66 kB"
"limit": "54.05 kB"
},
{
"name": "Interactions (Lex v2)",
"path": "./dist/esm/lex-v2/index.mjs",
"import": "{ Interactions }",
"limit": "52.66 kB"
"limit": "54.05 kB"
},
{
"name": "Interactions (Lex v1)",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { Blob as BlobPolyfill, File as FilePolyfill } from 'node:buffer';
import { WritableStream as WritableStreamPolyfill } from 'node:stream/web';

import { AWSCredentials } from '@aws-amplify/core/internals/utils';
import { Amplify, defaultStorage } from '@aws-amplify/core';

Expand All @@ -22,9 +25,16 @@ import { byteLength } from '../../../../../src/providers/s3/apis/uploadData/byte
import { CanceledError } from '../../../../../src/errors/CanceledError';
import { StorageOptions } from '../../../../../src/types';
import '../testUtils';
import { calculateContentCRC32 } from '../../../../../src/providers/s3/utils/crc32';
import { calculateContentMd5 } from '../../../../../src/providers/s3/utils';

global.Blob = BlobPolyfill as any;
global.File = FilePolyfill as any;
global.WritableStream = WritableStreamPolyfill as any;

jest.mock('@aws-amplify/core');
jest.mock('../../../../../src/providers/s3/utils/client/s3data');
jest.mock('../../../../../src/providers/s3/utils/crc32');

const credentials: AWSCredentials = {
accessKeyId: 'accessKeyId',
Expand All @@ -47,11 +57,44 @@ const mockCompleteMultipartUpload = jest.mocked(completeMultipartUpload);
const mockAbortMultipartUpload = jest.mocked(abortMultipartUpload);
const mockListParts = jest.mocked(listParts);
const mockHeadObject = jest.mocked(headObject);
const mockCalculateContentCRC32 = jest.mocked(calculateContentCRC32);

const disableAssertionFlag = true;

const MB = 1024 * 1024;

jest.mock('../../../../../src/providers/s3/utils', () => ({
...jest.requireActual('../../../../../src/providers/s3/utils'),
calculateContentMd5: jest.fn(),
}));

const getZeroDelayTimeout = () =>
new Promise<void>(resolve => {
setTimeout(() => {
resolve();
}, 0);
});

const mockCalculateContentCRC32Mock = () => {
mockCalculateContentCRC32.mockReset();
mockCalculateContentCRC32.mockResolvedValue({
checksumArrayBuffer: new ArrayBuffer(0),
checksum: 'mockChecksum',
seed: 0,
});
};
const mockCalculateContentCRC32Undefined = () => {
mockCalculateContentCRC32.mockReset();
mockCalculateContentCRC32.mockResolvedValue(undefined);
};
const mockCalculateContentCRC32Reset = () => {
mockCalculateContentCRC32.mockReset();
mockCalculateContentCRC32.mockImplementation(
jest.requireActual('../../../../../src/providers/s3/utils/crc32')
.calculateContentCRC32,
);
};

const mockMultipartUploadSuccess = (disableAssertion?: boolean) => {
let totalSize = 0;
mockCreateMultipartUpload.mockResolvedValueOnce({
Expand Down Expand Up @@ -149,9 +192,10 @@ describe('getMultipartUploadHandlers with key', () => {
});
});

afterEach(() => {
beforeEach(() => {
jest.clearAllMocks();
resetS3Mocks();
mockCalculateContentCRC32Reset();
});

it('should return multipart upload handlers', async () => {
Expand Down Expand Up @@ -230,6 +274,69 @@ describe('getMultipartUploadHandlers with key', () => {
);
});

it.each([
[
'file',
new File([getBlob(8 * MB)], 'someName'),
['JCnBsQ==', 'HELzGQ=='],
],
['blob', getBlob(8 * MB), ['JCnBsQ==', 'HELzGQ==']],
['string', 'Ü'.repeat(4 * MB), ['DL735w==', 'Akga7g==']],
['arrayBuffer', new ArrayBuffer(8 * MB), ['yTuzdQ==', 'eXJPxg==']],
['arrayBufferView', new Uint8Array(8 * MB), ['yTuzdQ==', 'eXJPxg==']],
])(
`should create crc32 for %s type body`,
async (_, twoPartsPayload, expectedCrc32) => {
mockMultipartUploadSuccess();
const { multipartUploadJob } = getMultipartUploadHandlers({
key: defaultKey,
data: twoPartsPayload,
});
await multipartUploadJob();

/**
* final crc32 calculation calls calculateContentCRC32 3 times
* 1 time for each of the 2 parts
* 1 time to combine the resulting hash for each of the two parts
*
* uploading each part calls calculateContentCRC32 1 time each
*
* these steps results in 5 calls in total
*/
expect(calculateContentCRC32).toHaveBeenCalledTimes(5);
expect(calculateContentMd5).not.toHaveBeenCalled();
expect(mockUploadPart).toHaveBeenCalledTimes(2);
expect(mockUploadPart).toHaveBeenCalledWith(
expect.anything(),
expect.objectContaining({ ChecksumCRC32: expectedCrc32[0] }),
);
expect(mockUploadPart).toHaveBeenCalledWith(
expect.anything(),
expect.objectContaining({ ChecksumCRC32: expectedCrc32[1] }),
);
},
);

it('should use md5 if crc32 is returning undefined', async () => {
mockCalculateContentCRC32Undefined();
mockMultipartUploadSuccess();
Amplify.libraryOptions = {
Storage: {
S3: {
isObjectLockEnabled: true,
},
},
};
const { multipartUploadJob } = getMultipartUploadHandlers({
key: defaultKey,
data: new Uint8Array(8 * MB),
});
await multipartUploadJob();
expect(calculateContentCRC32).toHaveBeenCalledTimes(1); // (final crc32 calculation = 1 undefined)
expect(calculateContentMd5).toHaveBeenCalledTimes(2);
expect(mockUploadPart).toHaveBeenCalledTimes(2);
});

it('should throw if unsupported payload type is provided', async () => {
mockMultipartUploadSuccess();
const { multipartUploadJob } = getMultipartUploadHandlers({
Expand All @@ -244,6 +351,7 @@ describe('getMultipartUploadHandlers with key', () => {
});

it('should upload a body that exceeds the size of default part size and parts count', async () => {
mockCalculateContentCRC32Mock();
let buffer: ArrayBuffer;
const file = {
__proto__: File.prototype,
Expand All @@ -268,7 +376,7 @@ describe('getMultipartUploadHandlers with key', () => {
file.size,
);
await multipartUploadJob();
expect(file.slice).toHaveBeenCalledTimes(10_000); // S3 limit of parts count
expect(file.slice).toHaveBeenCalledTimes(10_000 * 2); // S3 limit of parts count double for crc32 calculations
expect(mockCreateMultipartUpload).toHaveBeenCalledTimes(1);
expect(mockUploadPart).toHaveBeenCalledTimes(10_000);
expect(mockCompleteMultipartUpload).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -621,6 +729,15 @@ describe('getMultipartUploadHandlers with key', () => {

describe('pause() & resume()', () => {
it('should abort in-flight uploadPart requests if upload is paused', async () => {
let pausedOnce = false;

let resumeTest: () => void;
const waitForPause = new Promise<void>(resolve => {
resumeTest = () => {
resolve();
};
});

const { multipartUploadJob, onPause, onResume } =
getMultipartUploadHandlers({
key: defaultKey,
Expand All @@ -629,16 +746,21 @@ describe('getMultipartUploadHandlers with key', () => {
let partCount = 0;
mockMultipartUploadCancellation(() => {
partCount++;
if (partCount === 2) {
if (partCount === 2 && !pausedOnce) {
onPause(); // Pause upload at the the last uploadPart call
resumeTest();
pausedOnce = true;
}
});
const uploadPromise = multipartUploadJob();
await waitForPause;
await getZeroDelayTimeout();
onResume();
await uploadPromise;
expect(mockUploadPart).toHaveBeenCalledTimes(2);
expect(mockUploadPart).toHaveBeenCalledTimes(3);
expect(mockUploadPart.mock.calls[0][0].abortSignal?.aborted).toBe(true);
expect(mockUploadPart.mock.calls[1][0].abortSignal?.aborted).toBe(true);
expect(mockUploadPart.mock.calls[2][0].abortSignal?.aborted).toBe(false);
});
});

Expand Down Expand Up @@ -735,9 +857,10 @@ describe('getMultipartUploadHandlers with path', () => {
});
});

afterEach(() => {
beforeEach(() => {
jest.clearAllMocks();
resetS3Mocks();
mockCalculateContentCRC32Reset();
});

it('should return multipart upload handlers', async () => {
Expand Down Expand Up @@ -808,6 +931,69 @@ describe('getMultipartUploadHandlers with path', () => {
);
});

it.each([
[
'file',
new File([getBlob(8 * MB)], 'someName'),
['JCnBsQ==', 'HELzGQ=='],
],
['blob', getBlob(8 * MB), ['JCnBsQ==', 'HELzGQ==']],
['string', 'Ü'.repeat(4 * MB), ['DL735w==', 'Akga7g==']],
['arrayBuffer', new ArrayBuffer(8 * MB), ['yTuzdQ==', 'eXJPxg==']],
['arrayBufferView', new Uint8Array(8 * MB), ['yTuzdQ==', 'eXJPxg==']],
])(
`should create crc32 for %s type body`,
async (_, twoPartsPayload, expectedCrc32) => {
mockMultipartUploadSuccess();
const { multipartUploadJob } = getMultipartUploadHandlers({
path: testPath,
data: twoPartsPayload,
});
await multipartUploadJob();

/**
* final crc32 calculation calls calculateContentCRC32 3 times
* 1 time for each of the 2 parts
* 1 time to combine the resulting hash for each of the two parts
*
* uploading each part calls calculateContentCRC32 1 time each
*
* these steps results in 5 calls in total
*/
expect(calculateContentCRC32).toHaveBeenCalledTimes(5);
expect(calculateContentMd5).not.toHaveBeenCalled();
expect(mockUploadPart).toHaveBeenCalledTimes(2);
expect(mockUploadPart).toHaveBeenCalledWith(
expect.anything(),
expect.objectContaining({ ChecksumCRC32: expectedCrc32[0] }),
);
expect(mockUploadPart).toHaveBeenCalledWith(
expect.anything(),
expect.objectContaining({ ChecksumCRC32: expectedCrc32[1] }),
);
},
);

it('should use md5 if crc32 is returning undefined', async () => {
mockCalculateContentCRC32Undefined();
mockMultipartUploadSuccess();
Amplify.libraryOptions = {
Storage: {
S3: {
isObjectLockEnabled: true,
},
},
};
const { multipartUploadJob } = getMultipartUploadHandlers({
path: testPath,
data: new Uint8Array(8 * MB),
});
await multipartUploadJob();
expect(calculateContentCRC32).toHaveBeenCalledTimes(1); // (final crc32 calculation = 1 undefined)
expect(calculateContentMd5).toHaveBeenCalledTimes(2);
expect(mockUploadPart).toHaveBeenCalledTimes(2);
});

it('should throw if unsupported payload type is provided', async () => {
mockMultipartUploadSuccess();
const { multipartUploadJob } = getMultipartUploadHandlers({
Expand All @@ -822,6 +1008,7 @@ describe('getMultipartUploadHandlers with path', () => {
});

it('should upload a body that exceeds the size of default part size and parts count', async () => {
mockCalculateContentCRC32Mock();
let buffer: ArrayBuffer;
const file = {
__proto__: File.prototype,
Expand All @@ -846,7 +1033,7 @@ describe('getMultipartUploadHandlers with path', () => {
file.size,
);
await multipartUploadJob();
expect(file.slice).toHaveBeenCalledTimes(10_000); // S3 limit of parts count
expect(file.slice).toHaveBeenCalledTimes(10_000 * 2); // S3 limit of parts count double for crc32 calculations
expect(mockCreateMultipartUpload).toHaveBeenCalledTimes(1);
expect(mockUploadPart).toHaveBeenCalledTimes(10_000);
expect(mockCompleteMultipartUpload).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -1277,6 +1464,14 @@ describe('getMultipartUploadHandlers with path', () => {

describe('pause() & resume()', () => {
it('should abort in-flight uploadPart requests if upload is paused', async () => {
let pausedOnce = false;
let resumeTest: () => void;
const waitForPause = new Promise<void>(resolve => {
resumeTest = () => {
resolve();
};
});

const { multipartUploadJob, onPause, onResume } =
getMultipartUploadHandlers({
path: testPath,
Expand All @@ -1285,16 +1480,22 @@ describe('getMultipartUploadHandlers with path', () => {
let partCount = 0;
mockMultipartUploadCancellation(() => {
partCount++;
if (partCount === 2) {
if (partCount === 2 && !pausedOnce) {
onPause(); // Pause upload at the the last uploadPart call
resumeTest();
pausedOnce = true;
}
});
const uploadPromise = multipartUploadJob();
await waitForPause;
await getZeroDelayTimeout();

onResume();
await uploadPromise;
expect(mockUploadPart).toHaveBeenCalledTimes(2);
expect(mockUploadPart).toHaveBeenCalledTimes(3);
expect(mockUploadPart.mock.calls[0][0].abortSignal?.aborted).toBe(true);
expect(mockUploadPart.mock.calls[1][0].abortSignal?.aborted).toBe(true);
expect(mockUploadPart.mock.calls[2][0].abortSignal?.aborted).toBe(false);
});
});

Expand Down
Loading

0 comments on commit a06c2f9

Please sign in to comment.