From db0075e89947de3b3f796db6f7ddfe3c65163e7b Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Mon, 21 Oct 2024 11:36:46 -0500 Subject: [PATCH] Add reindexFromSnapshotWorkerSize to cdk with default and maximum modes (#1085) Signed-off-by: Andre Kurait --- .../default-values.json | 3 +- .../lib/common-utilities.ts | 4 +- .../reindex-from-snapshot-stack.ts | 24 +++-- .../lib/stack-composer.ts | 8 +- .../opensearch-service-migration/options.md | 1 + .../test/default-values-test.json | 3 +- .../test/reindex-from-snapshot-stack.test.ts | 98 ++++++++++++++++++- 7 files changed, 124 insertions(+), 17 deletions(-) diff --git a/deployment/cdk/opensearch-service-migration/default-values.json b/deployment/cdk/opensearch-service-migration/default-values.json index e92fbf17f..d6ee3de7e 100644 --- a/deployment/cdk/opensearch-service-migration/default-values.json +++ b/deployment/cdk/opensearch-service-migration/default-values.json @@ -4,5 +4,6 @@ "migrationAssistanceEnabled": true, "migrationConsoleServiceEnabled": true, "trafficReplayerServiceEnabled": false, - "otelCollectorEnabled": true + "otelCollectorEnabled": true, + "reindexFromSnapshotWorkerSize": "default" } diff --git a/deployment/cdk/opensearch-service-migration/lib/common-utilities.ts b/deployment/cdk/opensearch-service-migration/lib/common-utilities.ts index e6d55869e..aae412db0 100644 --- a/deployment/cdk/opensearch-service-migration/lib/common-utilities.ts +++ b/deployment/cdk/opensearch-service-migration/lib/common-utilities.ts @@ -470,8 +470,8 @@ export function makeLocalAssetContainerImage(scope: Construct, imageName: string if (!imageId) { throw new Error(`No RepoDigests found for image: ${imageName}`); } - imageHash = imageId.split(':')[1]; - CdkLogger.info('For image: ' + imageName + ' found hash: ' + imageHash); + imageHash = imageId.replace(/[^a-zA-Z0-9-_]/g, '_'); + CdkLogger.info('For image: ' + imageName + ' found imageHash: ' + imageHash); } catch (error) { CdkLogger.error('Error fetching the actual hash for the image: ' + imageName + ' Error: ' + error); throw new Error('Error fetching the image hash for the image: ' + imageName + ' Error: ' + error); diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts index 0a172252c..642836eb4 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts @@ -31,7 +31,8 @@ export interface ReindexFromSnapshotProps extends StackPropsExt { readonly otelCollectorEnabled: boolean, readonly clusterAuthDetails: ClusterAuth readonly sourceClusterVersion?: string, - readonly maxShardSizeGiB?: number + readonly maxShardSizeGiB?: number, + readonly reindexFromSnapshotWorkerSize: "default" | "maximum", } @@ -80,14 +81,18 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore { const extraArgsDict = parseArgsToDict(props.extraArgs) const storagePath = "/storage" const planningSize = props.maxShardSizeGiB ?? 80; - const maxShardSizeBytes = `${planningSize * 1024 * 1024 * 1024}` + const maxShardSizeBytes = planningSize * 1024 * 1024 * 1024 * 1.10 // Add 10% buffer command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--s3-local-dir", `"${storagePath}/s3_files"`) command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--s3-repo-uri", `"${s3Uri}"`) command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--s3-region", this.region) command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--snapshot-name", "rfs-snapshot") command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--lucene-dir", `"${storagePath}/lucene"`) command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-host", osClusterEndpoint) - command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--max-shard-size-bytes", maxShardSizeBytes) + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--max-shard-size-bytes", `${maxShardSizeBytes}`) + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--max-connections", props.reindexFromSnapshotWorkerSize === "maximum" ? "100" : "10") + if (props.reindexFromSnapshotWorkerSize === "maximum") { + command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-compression") + } if (props.clusterAuthDetails.sigv4) { command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-aws-service-signing-name", props.clusterAuthDetails.sigv4.serviceSigningName) command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-aws-region", props.clusterAuthDetails.sigv4.region) @@ -130,12 +135,12 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore { // Calculate the volume size based on the max shard size // Have space for the snapshot and an unpacked copy, with buffer - const volumeSize = Math.max( - Math.ceil(planningSize * 2 * 1.15), + const volumeSizeGB = Math.max( + Math.ceil(maxShardSizeBytes/(1000**3) * 2 * 1.15), 1 ) - if (volumeSize > 16000) { + if (volumeSizeGB > 16000) { // 16 TiB is the maximum volume size for GP3 throw new Error(`"Your max shard size of ${props.maxShardSizeGiB} GiB is too large to migrate."`) } @@ -144,9 +149,10 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore { const snapshotVolume = new ServiceManagedVolume(this, 'SnapshotVolume', { name: 'snapshot-volume', managedEBSVolume: { - size: Size.gibibytes(volumeSize), + size: Size.gibibytes(volumeSizeGB), volumeType: EbsDeviceVolumeType.GP3, fileSystemType: FileSystemType.XFS, + throughput: props.reindexFromSnapshotWorkerSize === "maximum" ? 450 : 125, tagSpecifications: [{ tags: { Name: `rfs-snapshot-volume-${props.stage}`, @@ -174,8 +180,8 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore { mountPoints: mountPoints, taskRolePolicies: servicePolicies, cpuArchitecture: props.fargateCpuArch, - taskCpuUnits: 2048, - taskMemoryLimitMiB: 4096, + taskCpuUnits: props.reindexFromSnapshotWorkerSize === "maximum" ? 16 * 1024 : 2 * 1024, + taskMemoryLimitMiB: props.reindexFromSnapshotWorkerSize === "maximum" ? 32 * 1024 : 4 * 1024, environment: { "RFS_COMMAND": command, "RFS_TARGET_USER": targetUser, diff --git a/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts b/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts index 806002c93..8724712c6 100644 --- a/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts +++ b/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts @@ -210,6 +210,7 @@ export class StackComposer { const reindexFromSnapshotServiceEnabled = this.getContextForType('reindexFromSnapshotServiceEnabled', 'boolean', defaultValues, contextJSON) const reindexFromSnapshotExtraArgs = this.getContextForType('reindexFromSnapshotExtraArgs', 'string', defaultValues, contextJSON) const reindexFromSnapshotMaxShardSizeGiB = this.getContextForType('reindexFromSnapshotMaxShardSizeGiB', 'number', defaultValues, contextJSON) + const reindexFromSnapshotWorkerSize = this.getContextForType('reindexFromSnapshotWorkerSize', 'string', defaultValues, contextJSON) const albAcmCertArn = this.getContextForType('albAcmCertArn', 'string', defaultValues, contextJSON); const managedServiceSourceSnapshotEnabled = this.getContextForType('managedServiceSourceSnapshotEnabled', 'boolean', defaultValues, contextJSON) @@ -282,6 +283,10 @@ export class StackComposer { const targetVersion = targetCluster?.version ? this.getEngineVersion(targetCluster?.version) : null const engineVersionValue = engineVersion ? this.getEngineVersion(engineVersion) : this.getEngineVersion('OS_2.15') + if (reindexFromSnapshotWorkerSize !== "default" && reindexFromSnapshotWorkerSize !== "maximum") { + throw new Error("Invalid value for reindexFromSnapshotWorkerSize, must be either 'default' or 'maximum'") + } + const requiredFields: { [key: string]: any; } = {"stage":stage} for (let key in requiredFields) { if (!requiredFields[key]) { @@ -490,7 +495,8 @@ export class StackComposer { defaultDeployId: defaultDeployId, fargateCpuArch: fargateCpuArch, env: props.env, - maxShardSizeGiB: reindexFromSnapshotMaxShardSizeGiB + maxShardSizeGiB: reindexFromSnapshotMaxShardSizeGiB, + reindexFromSnapshotWorkerSize }) this.addDependentStacks(reindexFromSnapshotStack, [migrationStack, openSearchStack, osContainerStack]) this.stacks.push(reindexFromSnapshotStack) diff --git a/deployment/cdk/opensearch-service-migration/options.md b/deployment/cdk/opensearch-service-migration/options.md index b91864f2a..d7b7826a6 100644 --- a/deployment/cdk/opensearch-service-migration/options.md +++ b/deployment/cdk/opensearch-service-migration/options.md @@ -58,6 +58,7 @@ In all other cases, the required components of each cluster object are: | sourceClusterEndpoint | string | `"https://source-cluster.elb.us-east-1.endpoint.com"` | The endpoint for the source cluster from which RFS will take a snapshot | | managedServiceSourceSnapshotEnabled | boolean | true | Create the necessary roles and trust relationships to take a snapshot of a managed service source cluster. This is only compatible with SigV4 auth. | | reindexFromSnapshotMaxShardSizeGiB | integer | 80 | OPTIONAL: The size, in whole GiB, of the largest shard you want to migrate across all indices; used to ensure we have enough disk space reserved to perform the migration. Default: 80 GiB | +| reindexFromSnapshotWorkerSize | enum | default | maximum | OPTIONAL: default provisions a 2vCPU worker balancing speed with cost efficiency designed for most migrations with horizontal scaling, maximum provisions a 16vCPU worker for high throughput migrations when parallelization is limited (low source shard count). Default: default | ### VPC Options diff --git a/deployment/cdk/opensearch-service-migration/test/default-values-test.json b/deployment/cdk/opensearch-service-migration/test/default-values-test.json index b2bd52ef0..38349bf7b 100644 --- a/deployment/cdk/opensearch-service-migration/test/default-values-test.json +++ b/deployment/cdk/opensearch-service-migration/test/default-values-test.json @@ -1,5 +1,6 @@ { "engineVersion": "OS_1.0", "domainName": "sample-cdk-unit-test-domain", - "defaultFargateCpuArch": "X86_64" + "defaultFargateCpuArch": "X86_64", + "reindexFromSnapshotWorkerSize": "default" } \ No newline at end of file diff --git a/deployment/cdk/opensearch-service-migration/test/reindex-from-snapshot-stack.test.ts b/deployment/cdk/opensearch-service-migration/test/reindex-from-snapshot-stack.test.ts index e8aeed273..59ebeff77 100644 --- a/deployment/cdk/opensearch-service-migration/test/reindex-from-snapshot-stack.test.ts +++ b/deployment/cdk/opensearch-service-migration/test/reindex-from-snapshot-stack.test.ts @@ -72,6 +72,11 @@ describe('ReindexFromSnapshotStack Tests', () => { ], } }); + // Assert CPU configuration + template.hasResourceProperties('AWS::ECS::TaskDefinition', { + Cpu: "2048", + Memory: "4096", + }); }); test('ReindexFromSnapshotStack sets correct RFS command', () => { @@ -119,7 +124,7 @@ describe('ReindexFromSnapshotStack Tests', () => { { "Ref": "SsmParameterValuemigrationunittestdefaultosClusterEndpointC96584B6F00A464EAD1953AFF4B05118Parameter", }, - " --max-shard-size-bytes 85899345920 --source-version \"ES_7.10\"" + " --max-shard-size-bytes 94489280512 --max-connections 10 --source-version \"ES_7.10\"" ], ], } @@ -188,7 +193,7 @@ describe('ReindexFromSnapshotStack Tests', () => { { "Ref": "SsmParameterValuemigrationunittestdefaultosClusterEndpointC96584B6F00A464EAD1953AFF4B05118Parameter", }, - " --max-shard-size-bytes 85899345920 --target-aws-service-signing-name aoss --target-aws-region eu-west-1 --source-version \"ES_7.10\"", + " --max-shard-size-bytes 94489280512 --max-connections 10 --target-aws-service-signing-name aoss --target-aws-region eu-west-1 --source-version \"ES_7.10\"", ], ], } @@ -279,7 +284,7 @@ describe('ReindexFromSnapshotStack Tests', () => { { "Ref": "SsmParameterValuemigrationunittestdefaultosClusterEndpointC96584B6F00A464EAD1953AFF4B05118Parameter", }, - " --max-shard-size-bytes 85899345920 --source-version \"ES_7.10\" --custom-arg value --flag --snapshot-name \"custom-snapshot\"" + " --max-shard-size-bytes 94489280512 --max-connections 10 --source-version \"ES_7.10\" --custom-arg value --flag --snapshot-name \"custom-snapshot\"" ] ] } @@ -302,4 +307,91 @@ describe('ReindexFromSnapshotStack Tests', () => { } ]); }); + + test('ReindexFromSnapshotStack with maximum worker size', () => { + const contextOptions = { + vpcEnabled: true, + reindexFromSnapshotServiceEnabled: true, + stage: 'unit-test', + sourceCluster: { + "endpoint": "https://test-cluster", + "auth": {"type": "none"}, + "version": "ES_7.10" + }, + reindexFromSnapshotWorkerSize: "maximum", + migrationAssistanceEnabled: true, + }; + + const stacks = createStackComposer(contextOptions); + const reindexStack = stacks.stacks.find(s => s instanceof ReindexFromSnapshotStack) as ReindexFromSnapshotStack; + expect(reindexStack).toBeDefined(); + const template = Template.fromStack(reindexStack); + + const taskDefinitionCapture = new Capture(); + template.hasResourceProperties('AWS::ECS::TaskDefinition', { + ContainerDefinitions: taskDefinitionCapture, + }); + + const containerDefinitions = taskDefinitionCapture.asArray(); + expect(containerDefinitions.length).toBe(1); + expect(containerDefinitions[0].Command).toEqual([ + '/bin/sh', + '-c', + '/rfs-app/entrypoint.sh' + ]); + expect(containerDefinitions[0].Environment).toEqual([ + { + Name: 'RFS_COMMAND', + Value: { + "Fn::Join": [ + "", + [ "/rfs-app/runJavaWithClasspath.sh org.opensearch.migrations.RfsMigrateDocuments --s3-local-dir \"/storage/s3_files\" --s3-repo-uri \"s3://migration-artifacts-test-account-unit-test-us-east-1/rfs-snapshot-repo\" --s3-region us-east-1 --snapshot-name rfs-snapshot --lucene-dir \"/storage/lucene\" --target-host ", + { + "Ref": "SsmParameterValuemigrationunittestdefaultosClusterEndpointC96584B6F00A464EAD1953AFF4B05118Parameter", + }, + " --max-shard-size-bytes 94489280512 --max-connections 100 --target-compression --source-version \"ES_7.10\"", + ], + ], + } + }, + { + Name: 'RFS_TARGET_USER', + Value: '' + }, + { + Name: 'RFS_TARGET_PASSWORD', + Value: '' + }, + { + Name: 'RFS_TARGET_PASSWORD_ARN', + Value: '' + }, + { + Name: 'SHARED_LOGS_DIR_PATH', + Value: '/shared-logs-output/reindex-from-snapshot-default' + } + ]); + + // Assert CPU configuration + template.hasResourceProperties('AWS::ECS::TaskDefinition', { + Cpu: "16384", + Memory: "32768", + }); + const volumesCapture = new Capture(); + template.hasResourceProperties('AWS::ECS::Service', { + VolumeConfigurations: volumesCapture, + }); + const volumes = volumesCapture.asArray(); + expect(volumes).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + ManagedEBSVolume: expect.objectContaining({ + Encrypted: true, + SizeInGiB: 218, + Throughput: 450, + }), + }), + ]) + ); + }); });