Skip to content

Commit

Permalink
Add reindexFromSnapshotWorkerSize to cdk with default and maximum mod…
Browse files Browse the repository at this point in the history
…es (#1085)

Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait authored Oct 21, 2024
1 parent e2c5be7 commit db0075e
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
"migrationAssistanceEnabled": true,
"migrationConsoleServiceEnabled": true,
"trafficReplayerServiceEnabled": false,
"otelCollectorEnabled": true
"otelCollectorEnabled": true,
"reindexFromSnapshotWorkerSize": "default"
}
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,8 @@ export function makeLocalAssetContainerImage(scope: Construct, imageName: string
if (!imageId) {
throw new Error(`No RepoDigests found for image: ${imageName}`);
}
imageHash = imageId.split(':')[1];
CdkLogger.info('For image: ' + imageName + ' found hash: ' + imageHash);
imageHash = imageId.replace(/[^a-zA-Z0-9-_]/g, '_');
CdkLogger.info('For image: ' + imageName + ' found imageHash: ' + imageHash);
} catch (error) {
CdkLogger.error('Error fetching the actual hash for the image: ' + imageName + ' Error: ' + error);
throw new Error('Error fetching the image hash for the image: ' + imageName + ' Error: ' + error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ export interface ReindexFromSnapshotProps extends StackPropsExt {
readonly otelCollectorEnabled: boolean,
readonly clusterAuthDetails: ClusterAuth
readonly sourceClusterVersion?: string,
readonly maxShardSizeGiB?: number
readonly maxShardSizeGiB?: number,
readonly reindexFromSnapshotWorkerSize: "default" | "maximum",

}

Expand Down Expand Up @@ -80,14 +81,18 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore {
const extraArgsDict = parseArgsToDict(props.extraArgs)
const storagePath = "/storage"
const planningSize = props.maxShardSizeGiB ?? 80;
const maxShardSizeBytes = `${planningSize * 1024 * 1024 * 1024}`
const maxShardSizeBytes = planningSize * 1024 * 1024 * 1024 * 1.10 // Add 10% buffer
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--s3-local-dir", `"${storagePath}/s3_files"`)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--s3-repo-uri", `"${s3Uri}"`)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--s3-region", this.region)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--snapshot-name", "rfs-snapshot")
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--lucene-dir", `"${storagePath}/lucene"`)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-host", osClusterEndpoint)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--max-shard-size-bytes", maxShardSizeBytes)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--max-shard-size-bytes", `${maxShardSizeBytes}`)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--max-connections", props.reindexFromSnapshotWorkerSize === "maximum" ? "100" : "10")
if (props.reindexFromSnapshotWorkerSize === "maximum") {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-compression")
}
if (props.clusterAuthDetails.sigv4) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-aws-service-signing-name", props.clusterAuthDetails.sigv4.serviceSigningName)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-aws-region", props.clusterAuthDetails.sigv4.region)
Expand Down Expand Up @@ -130,12 +135,12 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore {

// Calculate the volume size based on the max shard size
// Have space for the snapshot and an unpacked copy, with buffer
const volumeSize = Math.max(
Math.ceil(planningSize * 2 * 1.15),
const volumeSizeGB = Math.max(
Math.ceil(maxShardSizeBytes/(1000**3) * 2 * 1.15),
1
)

if (volumeSize > 16000) {
if (volumeSizeGB > 16000) {
// 16 TiB is the maximum volume size for GP3
throw new Error(`"Your max shard size of ${props.maxShardSizeGiB} GiB is too large to migrate."`)
}
Expand All @@ -144,9 +149,10 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore {
const snapshotVolume = new ServiceManagedVolume(this, 'SnapshotVolume', {
name: 'snapshot-volume',
managedEBSVolume: {
size: Size.gibibytes(volumeSize),
size: Size.gibibytes(volumeSizeGB),
volumeType: EbsDeviceVolumeType.GP3,
fileSystemType: FileSystemType.XFS,
throughput: props.reindexFromSnapshotWorkerSize === "maximum" ? 450 : 125,
tagSpecifications: [{
tags: {
Name: `rfs-snapshot-volume-${props.stage}`,
Expand Down Expand Up @@ -174,8 +180,8 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore {
mountPoints: mountPoints,
taskRolePolicies: servicePolicies,
cpuArchitecture: props.fargateCpuArch,
taskCpuUnits: 2048,
taskMemoryLimitMiB: 4096,
taskCpuUnits: props.reindexFromSnapshotWorkerSize === "maximum" ? 16 * 1024 : 2 * 1024,
taskMemoryLimitMiB: props.reindexFromSnapshotWorkerSize === "maximum" ? 32 * 1024 : 4 * 1024,
environment: {
"RFS_COMMAND": command,
"RFS_TARGET_USER": targetUser,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ export class StackComposer {
const reindexFromSnapshotServiceEnabled = this.getContextForType('reindexFromSnapshotServiceEnabled', 'boolean', defaultValues, contextJSON)
const reindexFromSnapshotExtraArgs = this.getContextForType('reindexFromSnapshotExtraArgs', 'string', defaultValues, contextJSON)
const reindexFromSnapshotMaxShardSizeGiB = this.getContextForType('reindexFromSnapshotMaxShardSizeGiB', 'number', defaultValues, contextJSON)
const reindexFromSnapshotWorkerSize = this.getContextForType('reindexFromSnapshotWorkerSize', 'string', defaultValues, contextJSON)
const albAcmCertArn = this.getContextForType('albAcmCertArn', 'string', defaultValues, contextJSON);
const managedServiceSourceSnapshotEnabled = this.getContextForType('managedServiceSourceSnapshotEnabled', 'boolean', defaultValues, contextJSON)

Expand Down Expand Up @@ -282,6 +283,10 @@ export class StackComposer {
const targetVersion = targetCluster?.version ? this.getEngineVersion(targetCluster?.version) : null
const engineVersionValue = engineVersion ? this.getEngineVersion(engineVersion) : this.getEngineVersion('OS_2.15')

if (reindexFromSnapshotWorkerSize !== "default" && reindexFromSnapshotWorkerSize !== "maximum") {
throw new Error("Invalid value for reindexFromSnapshotWorkerSize, must be either 'default' or 'maximum'")
}

const requiredFields: { [key: string]: any; } = {"stage":stage}
for (let key in requiredFields) {
if (!requiredFields[key]) {
Expand Down Expand Up @@ -490,7 +495,8 @@ export class StackComposer {
defaultDeployId: defaultDeployId,
fargateCpuArch: fargateCpuArch,
env: props.env,
maxShardSizeGiB: reindexFromSnapshotMaxShardSizeGiB
maxShardSizeGiB: reindexFromSnapshotMaxShardSizeGiB,
reindexFromSnapshotWorkerSize
})
this.addDependentStacks(reindexFromSnapshotStack, [migrationStack, openSearchStack, osContainerStack])
this.stacks.push(reindexFromSnapshotStack)
Expand Down
1 change: 1 addition & 0 deletions deployment/cdk/opensearch-service-migration/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ In all other cases, the required components of each cluster object are:
| sourceClusterEndpoint | string | `"https://source-cluster.elb.us-east-1.endpoint.com"` | The endpoint for the source cluster from which RFS will take a snapshot |
| managedServiceSourceSnapshotEnabled | boolean | true | Create the necessary roles and trust relationships to take a snapshot of a managed service source cluster. This is only compatible with SigV4 auth. |
| reindexFromSnapshotMaxShardSizeGiB | integer | 80 | OPTIONAL: The size, in whole GiB, of the largest shard you want to migrate across all indices; used to ensure we have enough disk space reserved to perform the migration. Default: 80 GiB |
| reindexFromSnapshotWorkerSize | enum | default | maximum | OPTIONAL: default provisions a 2vCPU worker balancing speed with cost efficiency designed for most migrations with horizontal scaling, maximum provisions a 16vCPU worker for high throughput migrations when parallelization is limited (low source shard count). Default: default |

### VPC Options

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"engineVersion": "OS_1.0",
"domainName": "sample-cdk-unit-test-domain",
"defaultFargateCpuArch": "X86_64"
"defaultFargateCpuArch": "X86_64",
"reindexFromSnapshotWorkerSize": "default"
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ describe('ReindexFromSnapshotStack Tests', () => {
],
}
});
// Assert CPU configuration
template.hasResourceProperties('AWS::ECS::TaskDefinition', {
Cpu: "2048",
Memory: "4096",
});
});

test('ReindexFromSnapshotStack sets correct RFS command', () => {
Expand Down Expand Up @@ -119,7 +124,7 @@ describe('ReindexFromSnapshotStack Tests', () => {
{
"Ref": "SsmParameterValuemigrationunittestdefaultosClusterEndpointC96584B6F00A464EAD1953AFF4B05118Parameter",
},
" --max-shard-size-bytes 85899345920 --source-version \"ES_7.10\""
" --max-shard-size-bytes 94489280512 --max-connections 10 --source-version \"ES_7.10\""
],
],
}
Expand Down Expand Up @@ -188,7 +193,7 @@ describe('ReindexFromSnapshotStack Tests', () => {
{
"Ref": "SsmParameterValuemigrationunittestdefaultosClusterEndpointC96584B6F00A464EAD1953AFF4B05118Parameter",
},
" --max-shard-size-bytes 85899345920 --target-aws-service-signing-name aoss --target-aws-region eu-west-1 --source-version \"ES_7.10\"",
" --max-shard-size-bytes 94489280512 --max-connections 10 --target-aws-service-signing-name aoss --target-aws-region eu-west-1 --source-version \"ES_7.10\"",
],
],
}
Expand Down Expand Up @@ -279,7 +284,7 @@ describe('ReindexFromSnapshotStack Tests', () => {
{
"Ref": "SsmParameterValuemigrationunittestdefaultosClusterEndpointC96584B6F00A464EAD1953AFF4B05118Parameter",
},
" --max-shard-size-bytes 85899345920 --source-version \"ES_7.10\" --custom-arg value --flag --snapshot-name \"custom-snapshot\""
" --max-shard-size-bytes 94489280512 --max-connections 10 --source-version \"ES_7.10\" --custom-arg value --flag --snapshot-name \"custom-snapshot\""
]
]
}
Expand All @@ -302,4 +307,91 @@ describe('ReindexFromSnapshotStack Tests', () => {
}
]);
});

test('ReindexFromSnapshotStack with maximum worker size', () => {
const contextOptions = {
vpcEnabled: true,
reindexFromSnapshotServiceEnabled: true,
stage: 'unit-test',
sourceCluster: {
"endpoint": "https://test-cluster",
"auth": {"type": "none"},
"version": "ES_7.10"
},
reindexFromSnapshotWorkerSize: "maximum",
migrationAssistanceEnabled: true,
};

const stacks = createStackComposer(contextOptions);
const reindexStack = stacks.stacks.find(s => s instanceof ReindexFromSnapshotStack) as ReindexFromSnapshotStack;
expect(reindexStack).toBeDefined();
const template = Template.fromStack(reindexStack);

const taskDefinitionCapture = new Capture();
template.hasResourceProperties('AWS::ECS::TaskDefinition', {
ContainerDefinitions: taskDefinitionCapture,
});

const containerDefinitions = taskDefinitionCapture.asArray();
expect(containerDefinitions.length).toBe(1);
expect(containerDefinitions[0].Command).toEqual([
'/bin/sh',
'-c',
'/rfs-app/entrypoint.sh'
]);
expect(containerDefinitions[0].Environment).toEqual([
{
Name: 'RFS_COMMAND',
Value: {
"Fn::Join": [
"",
[ "/rfs-app/runJavaWithClasspath.sh org.opensearch.migrations.RfsMigrateDocuments --s3-local-dir \"/storage/s3_files\" --s3-repo-uri \"s3://migration-artifacts-test-account-unit-test-us-east-1/rfs-snapshot-repo\" --s3-region us-east-1 --snapshot-name rfs-snapshot --lucene-dir \"/storage/lucene\" --target-host ",
{
"Ref": "SsmParameterValuemigrationunittestdefaultosClusterEndpointC96584B6F00A464EAD1953AFF4B05118Parameter",
},
" --max-shard-size-bytes 94489280512 --max-connections 100 --target-compression --source-version \"ES_7.10\"",
],
],
}
},
{
Name: 'RFS_TARGET_USER',
Value: ''
},
{
Name: 'RFS_TARGET_PASSWORD',
Value: ''
},
{
Name: 'RFS_TARGET_PASSWORD_ARN',
Value: ''
},
{
Name: 'SHARED_LOGS_DIR_PATH',
Value: '/shared-logs-output/reindex-from-snapshot-default'
}
]);

// Assert CPU configuration
template.hasResourceProperties('AWS::ECS::TaskDefinition', {
Cpu: "16384",
Memory: "32768",
});
const volumesCapture = new Capture();
template.hasResourceProperties('AWS::ECS::Service', {
VolumeConfigurations: volumesCapture,
});
const volumes = volumesCapture.asArray();
expect(volumes).toEqual(
expect.arrayContaining([
expect.objectContaining({
ManagedEBSVolume: expect.objectContaining({
Encrypted: true,
SizeInGiB: 218,
Throughput: 450,
}),
}),
])
);
});
});

0 comments on commit db0075e

Please sign in to comment.