Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reindexFromSnapshotWorkerSize to cdk with default and maximum modes #1085

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
"migrationAssistanceEnabled": true,
"migrationConsoleServiceEnabled": true,
"trafficReplayerServiceEnabled": false,
"otelCollectorEnabled": true
"otelCollectorEnabled": true,
"reindexFromSnapshotWorkerSize": "default"
}
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,8 @@ export function makeLocalAssetContainerImage(scope: Construct, imageName: string
if (!imageId) {
throw new Error(`No RepoDigests found for image: ${imageName}`);
}
imageHash = imageId.split(':')[1];
CdkLogger.info('For image: ' + imageName + ' found hash: ' + imageHash);
imageHash = imageId.replace(/[^a-zA-Z0-9-_]/g, '_');
CdkLogger.info('For image: ' + imageName + ' found imageHash: ' + imageHash);
} catch (error) {
CdkLogger.error('Error fetching the actual hash for the image: ' + imageName + ' Error: ' + error);
throw new Error('Error fetching the image hash for the image: ' + imageName + ' Error: ' + error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ export interface ReindexFromSnapshotProps extends StackPropsExt {
readonly otelCollectorEnabled: boolean,
readonly clusterAuthDetails: ClusterAuth
readonly sourceClusterVersion?: string,
readonly maxShardSizeGiB?: number
readonly maxShardSizeGiB?: number,
readonly reindexFromSnapshotWorkerSize: "default" | "maximum",

}

Expand Down Expand Up @@ -80,14 +81,18 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore {
const extraArgsDict = parseArgsToDict(props.extraArgs)
const storagePath = "/storage"
const planningSize = props.maxShardSizeGiB ?? 80;
const maxShardSizeBytes = `${planningSize * 1024 * 1024 * 1024}`
const maxShardSizeBytes = planningSize * 1024 * 1024 * 1024 * 1.10 // Add 10% buffer
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--s3-local-dir", `"${storagePath}/s3_files"`)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--s3-repo-uri", `"${s3Uri}"`)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--s3-region", this.region)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--snapshot-name", "rfs-snapshot")
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--lucene-dir", `"${storagePath}/lucene"`)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-host", osClusterEndpoint)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--max-shard-size-bytes", maxShardSizeBytes)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--max-shard-size-bytes", `${maxShardSizeBytes}`)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--max-connections", props.reindexFromSnapshotWorkerSize === "maximum" ? "100" : "10")
if (props.reindexFromSnapshotWorkerSize === "maximum") {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-compression")
}
if (props.clusterAuthDetails.sigv4) {
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-aws-service-signing-name", props.clusterAuthDetails.sigv4.serviceSigningName)
command = appendArgIfNotInExtraArgs(command, extraArgsDict, "--target-aws-region", props.clusterAuthDetails.sigv4.region)
Expand Down Expand Up @@ -130,12 +135,12 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore {

// Calculate the volume size based on the max shard size
// Have space for the snapshot and an unpacked copy, with buffer
const volumeSize = Math.max(
Math.ceil(planningSize * 2 * 1.15),
const volumeSizeGB = Math.max(
Math.ceil(maxShardSizeBytes/(1000**3) * 2 * 1.15),
1
)

if (volumeSize > 16000) {
if (volumeSizeGB > 16000) {
// 16 TiB is the maximum volume size for GP3
throw new Error(`"Your max shard size of ${props.maxShardSizeGiB} GiB is too large to migrate."`)
}
Expand All @@ -144,9 +149,10 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore {
const snapshotVolume = new ServiceManagedVolume(this, 'SnapshotVolume', {
name: 'snapshot-volume',
managedEBSVolume: {
size: Size.gibibytes(volumeSize),
size: Size.gibibytes(volumeSizeGB),
volumeType: EbsDeviceVolumeType.GP3,
fileSystemType: FileSystemType.XFS,
throughput: props.reindexFromSnapshotWorkerSize === "maximum" ? 450 : 125,
tagSpecifications: [{
tags: {
Name: `rfs-snapshot-volume-${props.stage}`,
Expand Down Expand Up @@ -174,8 +180,8 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore {
mountPoints: mountPoints,
taskRolePolicies: servicePolicies,
cpuArchitecture: props.fargateCpuArch,
taskCpuUnits: 2048,
taskMemoryLimitMiB: 4096,
taskCpuUnits: props.reindexFromSnapshotWorkerSize === "maximum" ? 16 * 1024 : 2 * 1024,
peternied marked this conversation as resolved.
Show resolved Hide resolved
taskMemoryLimitMiB: props.reindexFromSnapshotWorkerSize === "maximum" ? 32 * 1024 : 4 * 1024,
environment: {
"RFS_COMMAND": command,
"RFS_TARGET_USER": targetUser,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ export class StackComposer {
const reindexFromSnapshotServiceEnabled = this.getContextForType('reindexFromSnapshotServiceEnabled', 'boolean', defaultValues, contextJSON)
const reindexFromSnapshotExtraArgs = this.getContextForType('reindexFromSnapshotExtraArgs', 'string', defaultValues, contextJSON)
const reindexFromSnapshotMaxShardSizeGiB = this.getContextForType('reindexFromSnapshotMaxShardSizeGiB', 'number', defaultValues, contextJSON)
const reindexFromSnapshotWorkerSize = this.getContextForType('reindexFromSnapshotWorkerSize', 'string', defaultValues, contextJSON)
const albAcmCertArn = this.getContextForType('albAcmCertArn', 'string', defaultValues, contextJSON);
const managedServiceSourceSnapshotEnabled = this.getContextForType('managedServiceSourceSnapshotEnabled', 'boolean', defaultValues, contextJSON)

Expand Down Expand Up @@ -282,6 +283,10 @@ export class StackComposer {
const targetVersion = targetCluster?.version ? this.getEngineVersion(targetCluster?.version) : null
const engineVersionValue = engineVersion ? this.getEngineVersion(engineVersion) : this.getEngineVersion('OS_2.15')

if (reindexFromSnapshotWorkerSize !== "default" && reindexFromSnapshotWorkerSize !== "maximum") {
throw new Error("Invalid value for reindexFromSnapshotWorkerSize, must be either 'default' or 'maximum'")
}

const requiredFields: { [key: string]: any; } = {"stage":stage}
for (let key in requiredFields) {
if (!requiredFields[key]) {
Expand Down Expand Up @@ -490,7 +495,8 @@ export class StackComposer {
defaultDeployId: defaultDeployId,
fargateCpuArch: fargateCpuArch,
env: props.env,
maxShardSizeGiB: reindexFromSnapshotMaxShardSizeGiB
maxShardSizeGiB: reindexFromSnapshotMaxShardSizeGiB,
reindexFromSnapshotWorkerSize
})
this.addDependentStacks(reindexFromSnapshotStack, [migrationStack, openSearchStack, osContainerStack])
this.stacks.push(reindexFromSnapshotStack)
Expand Down
1 change: 1 addition & 0 deletions deployment/cdk/opensearch-service-migration/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ In all other cases, the required components of each cluster object are:
| sourceClusterEndpoint | string | `"https://source-cluster.elb.us-east-1.endpoint.com"` | The endpoint for the source cluster from which RFS will take a snapshot |
| managedServiceSourceSnapshotEnabled | boolean | true | Create the necessary roles and trust relationships to take a snapshot of a managed service source cluster. This is only compatible with SigV4 auth. |
| reindexFromSnapshotMaxShardSizeGiB | integer | 80 | OPTIONAL: The size, in whole GiB, of the largest shard you want to migrate across all indices; used to ensure we have enough disk space reserved to perform the migration. Default: 80 GiB |
| reindexFromSnapshotWorkerSize | enum | default | maximum | OPTIONAL: default provisions a 2vCPU worker balancing speed with cost efficiency designed for most migrations with horizontal scaling, maximum provisions a 16vCPU worker for high throughput migrations when parallelization is limited (low source shard count). Default: default |

### VPC Options

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"engineVersion": "OS_1.0",
"domainName": "sample-cdk-unit-test-domain",
"defaultFargateCpuArch": "X86_64"
"defaultFargateCpuArch": "X86_64",
"reindexFromSnapshotWorkerSize": "default"
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ describe('ReindexFromSnapshotStack Tests', () => {
],
}
});
// Assert CPU configuration
template.hasResourceProperties('AWS::ECS::TaskDefinition', {
Cpu: "2048",
Memory: "4096",
});
});

test('ReindexFromSnapshotStack sets correct RFS command', () => {
Expand Down Expand Up @@ -119,7 +124,7 @@ describe('ReindexFromSnapshotStack Tests', () => {
{
"Ref": "SsmParameterValuemigrationunittestdefaultosClusterEndpointC96584B6F00A464EAD1953AFF4B05118Parameter",
},
" --max-shard-size-bytes 85899345920 --source-version \"ES_7.10\""
" --max-shard-size-bytes 94489280512 --max-connections 10 --source-version \"ES_7.10\""
],
],
}
Expand Down Expand Up @@ -188,7 +193,7 @@ describe('ReindexFromSnapshotStack Tests', () => {
{
"Ref": "SsmParameterValuemigrationunittestdefaultosClusterEndpointC96584B6F00A464EAD1953AFF4B05118Parameter",
},
" --max-shard-size-bytes 85899345920 --target-aws-service-signing-name aoss --target-aws-region eu-west-1 --source-version \"ES_7.10\"",
" --max-shard-size-bytes 94489280512 --max-connections 10 --target-aws-service-signing-name aoss --target-aws-region eu-west-1 --source-version \"ES_7.10\"",
],
],
}
Expand Down Expand Up @@ -279,7 +284,7 @@ describe('ReindexFromSnapshotStack Tests', () => {
{
"Ref": "SsmParameterValuemigrationunittestdefaultosClusterEndpointC96584B6F00A464EAD1953AFF4B05118Parameter",
},
" --max-shard-size-bytes 85899345920 --source-version \"ES_7.10\" --custom-arg value --flag --snapshot-name \"custom-snapshot\""
" --max-shard-size-bytes 94489280512 --max-connections 10 --source-version \"ES_7.10\" --custom-arg value --flag --snapshot-name \"custom-snapshot\""
]
]
}
Expand All @@ -302,4 +307,91 @@ describe('ReindexFromSnapshotStack Tests', () => {
}
]);
});

test('ReindexFromSnapshotStack with maximum worker size', () => {
const contextOptions = {
vpcEnabled: true,
reindexFromSnapshotServiceEnabled: true,
stage: 'unit-test',
sourceCluster: {
"endpoint": "https://test-cluster",
"auth": {"type": "none"},
"version": "ES_7.10"
},
reindexFromSnapshotWorkerSize: "maximum",
migrationAssistanceEnabled: true,
};

const stacks = createStackComposer(contextOptions);
const reindexStack = stacks.stacks.find(s => s instanceof ReindexFromSnapshotStack) as ReindexFromSnapshotStack;
expect(reindexStack).toBeDefined();
const template = Template.fromStack(reindexStack);

const taskDefinitionCapture = new Capture();
template.hasResourceProperties('AWS::ECS::TaskDefinition', {
ContainerDefinitions: taskDefinitionCapture,
});

const containerDefinitions = taskDefinitionCapture.asArray();
expect(containerDefinitions.length).toBe(1);
expect(containerDefinitions[0].Command).toEqual([
'/bin/sh',
'-c',
'/rfs-app/entrypoint.sh'
]);
expect(containerDefinitions[0].Environment).toEqual([
{
Name: 'RFS_COMMAND',
Value: {
"Fn::Join": [
"",
[ "/rfs-app/runJavaWithClasspath.sh org.opensearch.migrations.RfsMigrateDocuments --s3-local-dir \"/storage/s3_files\" --s3-repo-uri \"s3://migration-artifacts-test-account-unit-test-us-east-1/rfs-snapshot-repo\" --s3-region us-east-1 --snapshot-name rfs-snapshot --lucene-dir \"/storage/lucene\" --target-host ",
{
"Ref": "SsmParameterValuemigrationunittestdefaultosClusterEndpointC96584B6F00A464EAD1953AFF4B05118Parameter",
},
" --max-shard-size-bytes 94489280512 --max-connections 100 --target-compression --source-version \"ES_7.10\"",
],
],
}
},
{
Name: 'RFS_TARGET_USER',
Value: ''
},
{
Name: 'RFS_TARGET_PASSWORD',
Value: ''
},
{
Name: 'RFS_TARGET_PASSWORD_ARN',
Value: ''
},
{
Name: 'SHARED_LOGS_DIR_PATH',
Value: '/shared-logs-output/reindex-from-snapshot-default'
}
]);

// Assert CPU configuration
template.hasResourceProperties('AWS::ECS::TaskDefinition', {
Cpu: "16384",
Memory: "32768",
});
const volumesCapture = new Capture();
template.hasResourceProperties('AWS::ECS::Service', {
VolumeConfigurations: volumesCapture,
});
const volumes = volumesCapture.asArray();
expect(volumes).toEqual(
expect.arrayContaining([
expect.objectContaining({
ManagedEBSVolume: expect.objectContaining({
Encrypted: true,
SizeInGiB: 218,
Throughput: 450,
}),
}),
])
);
});
});