From 8c6fa534715e2e1a7d025d959d62f752e5aba8ff Mon Sep 17 00:00:00 2001 From: Tanner Lewis Date: Tue, 13 Feb 2024 11:08:38 -0500 Subject: [PATCH 1/2] FEATURE: Add ability to export Kafka records (#488) * MIGRATIONS-1459: Add ability to export Kafka records Signed-off-by: Tanner Lewis --- .../migrations/common/CommonUtils.groovy | 22 ++- TrafficCapture/dockerSolution/build.gradle | 7 +- .../main/docker/migrationConsole/Dockerfile | 23 ++- .../docker/migrationConsole/kafkaExport.sh | 87 +++++++++ .../migrationConsole/runTestBenchmarks.sh | 1 - .../migrations/replay/KafkaPrinter.java | 174 ++++++++++++++++-- .../migrations/replay/KafkaPrinterTest.java | 101 +++++++--- .../lib/migration-assistance-stack.ts | 12 ++ .../service-stacks/capture-proxy-es-stack.ts | 2 +- .../lib/service-stacks/capture-proxy-stack.ts | 2 +- .../lib/service-stacks/elasticsearch-stack.ts | 2 +- .../migration-analytics-stack.ts | 2 +- .../service-stacks/migration-console-stack.ts | 19 +- .../service-stacks/migration-service-core.ts | 11 +- .../service-stacks/traffic-replayer-stack.ts | 2 +- .../opensearch-service-migration/options.md | 2 +- 16 files changed, 391 insertions(+), 78 deletions(-) create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/kafkaExport.sh diff --git a/TrafficCapture/buildSrc/src/main/groovy/org/opensearch/migrations/common/CommonUtils.groovy b/TrafficCapture/buildSrc/src/main/groovy/org/opensearch/migrations/common/CommonUtils.groovy index e7c01f202..73b8a28b5 100644 --- a/TrafficCapture/buildSrc/src/main/groovy/org/opensearch/migrations/common/CommonUtils.groovy +++ b/TrafficCapture/buildSrc/src/main/groovy/org/opensearch/migrations/common/CommonUtils.groovy @@ -3,7 +3,6 @@ package org.opensearch.migrations.common import org.gradle.api.tasks.Copy import org.gradle.api.Project import com.bmuschko.gradle.docker.tasks.image.Dockerfile -import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage class CommonUtils { static def calculateDockerHash(def projectName, def project) { @@ -23,17 +22,20 @@ class CommonUtils { } static def copyArtifact(Project project, String projectName) { - def dockerBuildDir = "build/docker/${projectName}" - def artifactsDir = "${dockerBuildDir}/jars" - project.task("copyArtifact_${projectName}", type: Copy) { - dependsOn ":${projectName}:build" - dependsOn ":${projectName}:jar" - if (projectName == "trafficCaptureProxyServerTest") { + def destBuildDir = "build/docker/${projectName}" + def destDir = "${destBuildDir}/jars" + copyArtifact(project, projectName, projectName, destDir) + } + static def copyArtifact(Project project, String artifactProjectName, String destProjectName, String destDir) { + project.task("copyArtifact_${destProjectName}", type: Copy) { + dependsOn ":${artifactProjectName}:build" + dependsOn ":${artifactProjectName}:jar" + if (destProjectName == "trafficCaptureProxyServerTest") { include "*.properties" } - from { project.project(":${projectName}").configurations.findByName("runtimeClasspath").files } - from { project.project(":${projectName}").tasks.getByName('jar') } - into artifactsDir + from { project.project(":${artifactProjectName}").configurations.findByName("runtimeClasspath").files } + from { project.project(":${artifactProjectName}").tasks.getByName('jar') } + into destDir include "*.jar" duplicatesStrategy = 'WARN' } diff --git a/TrafficCapture/dockerSolution/build.gradle b/TrafficCapture/dockerSolution/build.gradle index f65a60d20..672ecc89c 100644 --- a/TrafficCapture/dockerSolution/build.gradle +++ b/TrafficCapture/dockerSolution/build.gradle @@ -25,6 +25,11 @@ def dockerFilesForExternalServices = [ // Create the static docker files that aren't hosting migrations java code from this repo dockerFilesForExternalServices.each { projectName, dockerImageName -> task("buildDockerImage_${projectName}", type: DockerBuildImage) { + if (projectName == "migrationConsole") { + def destDir = "src/main/docker/${projectName}/build/jars" + CommonUtils.copyArtifact(project, "trafficReplayer", projectName, destDir) + dependsOn "copyArtifact_${projectName}" + } def hash = calculateDockerHash(projectName) images.add("migrations/${dockerImageName}:$hash") images.add("migrations/${dockerImageName}:latest") @@ -41,8 +46,6 @@ def baseImageProjectOverrides = [ ] javaContainerServices.each { projectName, dockerImageName -> - def dockerBuildDir = "build/docker/${projectName}" - def artifactsDir = "${dockerBuildDir}/jars"; CommonUtils.copyArtifact(project, projectName) CommonUtils.createDockerfile(project, projectName, baseImageProjectOverrides, dockerFilesForExternalServices) } diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile index 26e9c7c4d..914ae2ca2 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile @@ -4,11 +4,21 @@ ENV DEBIAN_FRONTEND noninteractive RUN apt-get update && \ apt-get install -y --no-install-recommends python3.9 python3-pip python3-dev openjdk-11-jre-headless wget gcc libc-dev git curl vim jq unzip less && \ - pip3 install urllib3==1.25.11 opensearch-benchmark==1.1.0 awscurl tqdm -# TODO upon the next release of opensearch-benchmark the awscli package should be installed by pip3, with the expected boto3 version upgrade resolving the current conflicts between opensearch-benchmark and awscli -RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && unzip awscliv2.zip && ./aws/install && rm -rf aws awscliv2.zip + pip3 install urllib3 opensearch-benchmark==1.2.0 awscurl tqdm awscli RUN mkdir /root/kafka-tools RUN mkdir /root/kafka-tools/aws + +WORKDIR /root/kafka-tools +# Get kafka distribution and unpack to 'kafka' +RUN wget -qO- https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz | tar --transform 's!^[^/]*!kafka!' -xvz +RUN wget -O kafka/libs/msk-iam-auth.jar https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.9/aws-msk-iam-auth-1.1.9-all.jar +WORKDIR /root + +# Add Traffic Replayer jars for running KafkaPrinter from this container +COPY build/jars /root/kafka-tools/replayer-jars +RUN printf "#!/bin/sh\njava -cp `echo /root/kafka-tools/replayer-jars/*.jar | tr \ :` \"\$@\" " > /root/kafka-tools/runJavaWithClasspath.sh +RUN chmod +x /root/kafka-tools/runJavaWithClasspath.sh + COPY runTestBenchmarks.sh /root/ COPY humanReadableLogs.py /root/ COPY simpleDocumentGenerator.py /root/ @@ -17,16 +27,13 @@ COPY showFetchMigrationCommand.sh /root/ COPY setupIntegTests.sh /root/ COPY msk-iam-auth.properties /root/kafka-tools/aws COPY kafkaCmdRef.md /root/kafka-tools +COPY kafkaExport.sh /root/kafka-tools RUN ln -s /usr/bin/python3 /usr/bin/python RUN chmod ug+x /root/runTestBenchmarks.sh RUN chmod ug+x /root/humanReadableLogs.py RUN chmod ug+x /root/simpleDocumentGenerator.py RUN chmod ug+x /root/catIndices.sh RUN chmod ug+x /root/showFetchMigrationCommand.sh -WORKDIR /root/kafka-tools -# Get kafka distribution and unpack to 'kafka' -RUN wget -qO- https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz | tar --transform 's!^[^/]*!kafka!' -xvz -RUN wget -O kafka/libs/msk-iam-auth.jar https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.9/aws-msk-iam-auth-1.1.9-all.jar -WORKDIR /root +RUN chmod ug+x /root/kafka-tools/kafkaExport.sh CMD tail -f /dev/null diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/kafkaExport.sh b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/kafkaExport.sh new file mode 100644 index 000000000..ef02e7d07 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/kafkaExport.sh @@ -0,0 +1,87 @@ +#!/bin/bash + +broker_endpoints="${MIGRATION_KAFKA_BROKER_ENDPOINTS}" +msk_auth_settings="" +kafka_command_settings="" +s3_bucket_name="" +if [ -n "$ECS_AGENT_URI" ]; then + msk_auth_settings="--kafka-traffic-enable-msk-auth" + kafka_command_settings="--command-config aws/msk-iam-auth.properties" + s3_bucket_name="migration-artifacts-$MIGRATION_STAGE-$AWS_REGION" +fi +topic="logging-traffic-topic" +timeout_seconds=60 +enable_s3=false + + +usage() { + echo "" + echo "Utility script for exporting all currently detected Kafka records to a gzip file, and allowing the option to store this archive file on S3." + echo "" + echo "Usage: " + echo " ./kafkaExport.sh <>" + echo "" + echo "Options:" + echo " --timeout-seconds Timeout for how long process will try to collect the Kafka records. Default is 60 seconds." + echo " --enable-s3 Option to store created archive on S3." + echo " --s3-bucket-name Option to specify a given S3 bucket to store archive on". + echo "" + exit 1 +} + +while [[ $# -gt 0 ]]; do + key="$1" + case $key in + --timeout-seconds) + timeout_seconds="$2" + shift + shift + ;; + --enable-s3) + enable_s3=true + shift + ;; + --s3-bucket-name) + s3_bucket_name="$2" + shift + shift + ;; + -h|--h|--help) + usage + ;; + -*) + echo "Unknown option $1" + usage + ;; + *) + shift + ;; + esac +done + +partition_offsets=$(./kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "$broker_endpoints" --topic "$topic" --time -1 $(echo "$kafka_command_settings")) +comma_sep_partition_offsets=$(echo $partition_offsets | sed 's/ /,/g') +echo "Collected offsets from current Kafka topic: " +echo $comma_sep_partition_offsets + +epoch_ts=$(date +%s) +dir_name="kafka_export_$epoch_ts" +mkdir -p $dir_name +archive_name="kafka_export_from_migration_console_$epoch_ts.tar.gz" +group="exportFromMigrationConsole_$(hostname -s)_$$_$epoch_ts" +echo "Group name: $group" + +set -o xtrace +./runJavaWithClasspath.sh org.opensearch.migrations.replay.KafkaPrinter --kafka-traffic-brokers "$broker_endpoints" --kafka-traffic-topic "$topic" --kafka-traffic-group-id "$group" $(echo "$msk_auth_settings") --timeout-seconds "$timeout_seconds" --partition-limits "$comma_sep_partition_offsets" --output-directory "./$dir_name" +set +o xtrace + +cd $dir_name +tar -czvf "$archive_name" *.proto && rm *.proto +cd .. + +# Remove created consumer group +./kafka/bin/kafka-consumer-groups.sh --bootstrap-server "$broker_endpoints" --timeout 100000 --delete --group "$group" $(echo "$kafka_command_settings") + +if [ "$enable_s3" = true ]; then + aws s3 mv "$dir_name/$archive_name" "s3://$s3_bucket_name" && echo "Export has been created: s3://$s3_bucket_name/$archive_name" +fi \ No newline at end of file diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/runTestBenchmarks.sh b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/runTestBenchmarks.sh index 558df039a..de74ae5fa 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/runTestBenchmarks.sh +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/runTestBenchmarks.sh @@ -18,7 +18,6 @@ usage() { echo " --auth-user The basic auth user to use for OSB requests." echo " --auth-pass The basic auth password to use for OSB requests." echo " --no-auth Use no auth when making OSB requests." - echo " --no-ssl Disable SSL when making OSB requests." echo "" exit 1 } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/KafkaPrinter.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/KafkaPrinter.java index c4b21179e..848e363c0 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/KafkaPrinter.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/KafkaPrinter.java @@ -9,17 +9,29 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.UUID; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -34,14 +46,24 @@ * 3. Clearing records for a topic with the same group id */ public class KafkaPrinter { + private static final Logger log = LoggerFactory.getLogger(KafkaPrinter.class); public static final Duration CONSUMER_POLL_TIMEOUT = Duration.ofSeconds(1); + static class PartitionTracker { + long currentRecordCount; + long recordLimit; + PartitionTracker(long currentRecordCount, long recordLimit) { + this.currentRecordCount = currentRecordCount; + this.recordLimit = recordLimit; + } + } + static class Parameters { @Parameter(required = true, names = {"--kafka-traffic-brokers"}, arity=1, - description = "Comma-separated list of host and port pairs that are the addresses of the Kafka brokers to bootstrap with i.e. 'localhost:9092,localhost2:9092'") + description = "Comma-separated list of host and port pairs that are the addresses of the Kafka brokers to bootstrap with e.g. 'localhost:9092,localhost2:9092'") String kafkaTrafficBrokers; @Parameter(required = true, names = {"--kafka-traffic-topic"}, @@ -63,6 +85,28 @@ static class Parameters { arity=1, description = "File path for Kafka properties file to use for additional or overriden Kafka properties") String kafkaTrafficPropertyFile; + @Parameter(required = false, + names = {"--partition-limits"}, + description = "Partition limit option will only print records for the provided partitions and up to the given limit " + + "specified. It will terminate the printer when all limits have been met. Argument can be used multiple " + + "times, and may be comma-separated, e.g. 'test-topic:0:10, test-topic:1:32' ") + List partitionLimits = new ArrayList<>(); + @Parameter(required = false, + names = {"--timeout-seconds"}, + arity=1, + description = "Timeout option for how long KafkaPrinter will continue to read from Kafka before terminating.") + long timeoutSeconds = 0; + @Parameter(required = false, + names = {"--output-directory"}, + arity=1, + description = "If provided will place output inside file(s) within this directory. Otherwise, output will be sent to STDOUT") + String outputDirectoryPath; + @Parameter(required = false, + names = {"--combine-partition-output"}, + arity=0, + description = "Creates a single output file with output from all partitions combined. Requires '--output-directory' to be specified.") + boolean combinePartitionOutput; + } public static Parameters parseArgs(String[] args) { @@ -79,7 +123,7 @@ public static Parameters parseArgs(String[] args) { } } - public static void main(String[] args) { + public static void main(String[] args) throws FileNotFoundException { Parameters params; try { params = parseArgs(args); @@ -87,6 +131,10 @@ public static void main(String[] args) { return; } + if (params.combinePartitionOutput && params.outputDirectoryPath == null) { + throw new ParameterException("The '--output-directory' parameter is required for using '--combine-partition-output'."); + } + String bootstrapServers = params.kafkaTrafficBrokers; String groupId = params.kafkaTrafficGroupId; String topic = params.kafkaTrafficTopic; @@ -114,9 +162,50 @@ public static void main(String[] args) { properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + Map capturedRecords = new HashMap<>(); + if (!params.partitionLimits.isEmpty()) { + for (String partitionLimit : params.partitionLimits) { + String[] partitionElements = partitionLimit.split(":"); + if (partitionElements.length != 3) { + throw new ParameterException("Partition limit provided does not match the expected format: topic_name:partition_id:num_records, actual value: " + partitionLimit); + } + TopicPartition partition = new TopicPartition(partitionElements[0], Integer.parseInt(partitionElements[1])); + if (capturedRecords.containsKey(partition)) { + throw new ParameterException("Duplicate parameter limit detected for the partition: " + partition); + } + capturedRecords.put(partition, new PartitionTracker(0, Long.parseLong(partitionElements[2]))); + } + } + + String baseOutputPath = params.outputDirectoryPath == null ? "./" : params.outputDirectoryPath; + baseOutputPath = !baseOutputPath.endsWith("/") ? baseOutputPath + "/" : baseOutputPath; + String uuid = UUID.randomUUID().toString(); + boolean separatePartitionOutputs = false; + Map partitionOutputStreams = new HashMap<>(); + // Grab all partition records + if (capturedRecords.isEmpty()) { + OutputStream os = params.outputDirectoryPath == null ? System.out : new FileOutputStream(String.format("%s%s_%s_%s.proto", baseOutputPath, params.kafkaTrafficTopic, "all", uuid)); + partitionOutputStreams.put(0, CodedOutputStream.newInstance(os)); + } + // Only grab specific partition records based on limits + else { + if (params.combinePartitionOutput || params.outputDirectoryPath == null) { + OutputStream os = params.outputDirectoryPath == null ? System.out : new FileOutputStream(String.format("%s%s_%s_%s.proto", baseOutputPath, params.kafkaTrafficTopic, "all", uuid)); + partitionOutputStreams.put(0, CodedOutputStream.newInstance(os)); + } + else { + for (TopicPartition partition : capturedRecords.keySet()) { + separatePartitionOutputs = true; + FileOutputStream fos = new FileOutputStream(String.format("%s%s_%d_%s.proto", baseOutputPath, partition.topic(), partition.partition(), uuid)); + partitionOutputStreams.put(partition.partition(), CodedOutputStream.newInstance(fos)); + } + } + } + try (KafkaConsumer consumer = new KafkaConsumer<>(properties)) { consumer.subscribe(Collections.singleton(topic)); - pipeRecordsToProtoBufDelimited(consumer, getDelimitedProtoBufOutputter(System.out)); + pipeRecordsToProtoBufDelimited(consumer, getDelimitedProtoBufOutputter(capturedRecords, partitionOutputStreams, separatePartitionOutputs), + params.timeoutSeconds, capturedRecords); } catch (WakeupException e) { log.info("Wake up exception!"); } catch (Exception e) { @@ -126,34 +215,85 @@ public static void main(String[] args) { } } + static boolean checkAllRecordsCompleted(Collection trackers) { + for (PartitionTracker tracker : trackers) { + if (tracker.currentRecordCount < tracker.recordLimit) { + return false; + } + } + return true; + } + static void pipeRecordsToProtoBufDelimited( - Consumer kafkaConsumer, - java.util.function.Consumer> binaryReceiver) { - while (true) { - processNextChunkOfKafkaEvents(kafkaConsumer, binaryReceiver); + Consumer kafkaConsumer, java.util.function.Consumer>> binaryReceiver, + long timeoutSeconds, Map capturedRecords) { + + long endTime = System.currentTimeMillis() + (timeoutSeconds * 1000); + boolean continueCapture = true; + while (continueCapture) { + if (!capturedRecords.isEmpty() && checkAllRecordsCompleted(capturedRecords.values())) { + log.info("All partition limits have been met, stopping Kafka polls"); + continueCapture = false; + } + else if (timeoutSeconds > 0 && System.currentTimeMillis() >= endTime) { + log.warn("Specified timeout of {} seconds has been breached, stopping Kafka polls", timeoutSeconds); + continueCapture = false; + } + else { + for (PartitionTracker pt : capturedRecords.values()) { + log.debug("Tracker is at {} records for limit {}", pt.currentRecordCount, pt.recordLimit); + } + processNextChunkOfKafkaEvents(kafkaConsumer, binaryReceiver); + } } } - static void processNextChunkOfKafkaEvents(Consumer kafkaConsumer, java.util.function.Consumer> binaryReceiver) { + static void processNextChunkOfKafkaEvents(Consumer kafkaConsumer, java.util.function.Consumer>> binaryReceiver) { var records = kafkaConsumer.poll(CONSUMER_POLL_TIMEOUT); - binaryReceiver.accept(StreamSupport.stream(records.spliterator(), false) - .map(ConsumerRecord::value)); + binaryReceiver.accept(StreamSupport.stream(records.spliterator(), false)); } - static java.util.function.Consumer> getDelimitedProtoBufOutputter(OutputStream outputStream) { - CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(outputStream); + static java.util.function.Consumer>> getDelimitedProtoBufOutputter(Map capturedRecords, + Map partitionOutputStreams, boolean separatePartitionOutputs) { - return bufferStream -> { - bufferStream.forEach(buffer -> { + Set usedCodedOutputStreams = new HashSet<>(); + + return consumerRecordStream -> { + consumerRecordStream.forEach(consumerRecord -> { try { - codedOutputStream.writeUInt32NoTag(buffer.length); - codedOutputStream.writeRawBytes(buffer); + // No partition limits case, output everything + if (capturedRecords.isEmpty()) { + CodedOutputStream codedOutputStream = partitionOutputStreams.get(0); + usedCodedOutputStreams.add(codedOutputStream); + byte[] buffer = consumerRecord.value(); + codedOutputStream.writeUInt32NoTag(buffer.length); + codedOutputStream.writeRawBytes(buffer); + } + else { + TopicPartition partition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition()); + log.debug("Incoming record for topic:{} and partition:{}", partition.topic(), partition.partition()); + PartitionTracker tracker = capturedRecords.get(partition); + boolean outputNeeded = false; + if (tracker != null && tracker.currentRecordCount < tracker.recordLimit) { + tracker.currentRecordCount++; + outputNeeded = true; + } + if (outputNeeded) { + CodedOutputStream codedOutputStream = separatePartitionOutputs ? partitionOutputStreams.get(partition.partition()) : partitionOutputStreams.get(0); + usedCodedOutputStreams.add(codedOutputStream); + byte[] buffer = consumerRecord.value(); + codedOutputStream.writeUInt32NoTag(buffer.length); + codedOutputStream.writeRawBytes(buffer); + } + } } catch (IOException e) { throw Lombok.sneakyThrow(e); } }); try { - codedOutputStream.flush(); + for (CodedOutputStream cos : usedCodedOutputStreams) { + cos.flush(); + } } catch (IOException e) { throw Lombok.sneakyThrow(e); } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaPrinterTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaPrinterTest.java index 2479999e7..ecdee1684 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaPrinterTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaPrinterTest.java @@ -1,13 +1,12 @@ package org.opensearch.migrations.replay; import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Timestamp; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; -import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -17,19 +16,15 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.OutputStream; -import java.io.PipedOutputStream; import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.time.Instant; -import java.util.List; +import java.util.ArrayList; +import java.util.HashMap; import java.util.Map; import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Stream; -import java.util.stream.StreamSupport; @Slf4j class KafkaPrinterTest { @@ -64,16 +59,16 @@ private static byte[] makeTrafficStreamBytes(Instant t, String payload, int numR } } - private static class CountingConsumer implements Consumer> { - final java.util.function.Consumer> underlyingConsumer; + private static class CountingConsumer implements Consumer>> { + final java.util.function.Consumer>> underlyingConsumer; int count; - public CountingConsumer(Consumer> underlyingConsumer) { + public CountingConsumer(Consumer>> underlyingConsumer) { this.underlyingConsumer = underlyingConsumer; } @Override - public void accept(Stream stream) { + public void accept(Stream> stream) { underlyingConsumer.accept(stream.map(msg->{log.trace("read msg"); count++; return msg;})); } } @@ -82,17 +77,62 @@ public void accept(Stream stream) { public void testStreamFormatting() throws Exception { Random random = new Random(2); var numTrafficStreams = 10; - var kafkaConsumer = makeKafkaConsumer(numTrafficStreams, () -> random.nextInt(NUM_READ_ITEMS_BOUND)); - var delimitedOutputBytes = getOutputFromConsumer(kafkaConsumer, numTrafficStreams); + var kafkaConsumer = makeKafkaConsumer(Map.of(0, numTrafficStreams), () -> random.nextInt(NUM_READ_ITEMS_BOUND)); + var emptyPartitionLimits = new HashMap(); + var delimitedOutputBytes = getOutputFromConsumer(kafkaConsumer, numTrafficStreams, emptyPartitionLimits); validateNumberOfTrafficStreamsEmitted(NUM_PROTOBUF_OBJECTS, delimitedOutputBytes); } + @Test + public void testSinglePartitionLimiting() throws Exception { + Random random = new Random(3); + // Use larger number of streams than recordLimit cutoff + var numTrafficStreams = 20; + var recordLimit = 10; + var kafkaConsumer = makeKafkaConsumer(Map.of(0, numTrafficStreams), () -> random.nextInt(NUM_READ_ITEMS_BOUND)); + var partitionLimits = new HashMap(); + TopicPartition partition0 = new TopicPartition(TEST_TOPIC_NAME, 0); + partitionLimits.put(partition0, new KafkaPrinter.PartitionTracker(0, recordLimit)); + var delimitedOutputBytes = getOutputFromConsumer(kafkaConsumer, numTrafficStreams, partitionLimits); + Assertions.assertEquals(recordLimit, partitionLimits.get(partition0).currentRecordCount); + validateNumberOfTrafficStreamsEmitted(NUM_PROTOBUF_OBJECTS, delimitedOutputBytes); + } + + @Test + public void testMultiplePartitionLimiting() throws Exception { + Random random = new Random(4); + // Use larger number of streams than recordLimit cutoff + var numTrafficStreamsPartition0 = 30; + var numTrafficStreamsPartition1 = 20; + var numTrafficStreamsPartition2 = 0; + var totalTrafficStreams = numTrafficStreamsPartition0 + numTrafficStreamsPartition1 + numTrafficStreamsPartition2; + var recordLimitPartition0 = 17; + var recordLimitPartition1 = 8; + var recordLimitPartition2 = 0; + var totalLimitTrafficStreams = recordLimitPartition0 + recordLimitPartition1 + recordLimitPartition2; + + var kafkaConsumer = makeKafkaConsumer(Map.of(0, numTrafficStreamsPartition0, 1, numTrafficStreamsPartition1, 2, numTrafficStreamsPartition2), () -> random.nextInt(NUM_READ_ITEMS_BOUND)); + var partitionLimits = new HashMap(); + TopicPartition partition0 = new TopicPartition(TEST_TOPIC_NAME, 0); + TopicPartition partition1 = new TopicPartition(TEST_TOPIC_NAME, 1); + TopicPartition partition2 = new TopicPartition(TEST_TOPIC_NAME, 2); + partitionLimits.put(partition0, new KafkaPrinter.PartitionTracker(0, recordLimitPartition0)); + partitionLimits.put(partition1, new KafkaPrinter.PartitionTracker(0, recordLimitPartition1)); + partitionLimits.put(partition2, new KafkaPrinter.PartitionTracker(0, recordLimitPartition2)); + var delimitedOutputBytes = getOutputFromConsumer(kafkaConsumer, totalTrafficStreams, partitionLimits); + Assertions.assertEquals(recordLimitPartition0, partitionLimits.get(partition0).currentRecordCount); + Assertions.assertEquals(recordLimitPartition1, partitionLimits.get(partition1).currentRecordCount); + Assertions.assertEquals(recordLimitPartition2, partitionLimits.get(partition2).currentRecordCount); + validateNumberOfTrafficStreamsEmitted(totalLimitTrafficStreams, delimitedOutputBytes); + } + private byte[] getOutputFromConsumer(org.apache.kafka.clients.consumer.Consumer kafkaConsumer, - int expectedMessageCount) + int expectedMessageCount, Map capturedRecords) throws Exception { try (var baos = new ByteArrayOutputStream()) { - var wrappedConsumer = new CountingConsumer(KafkaPrinter.getDelimitedProtoBufOutputter(baos)); + var wrappedConsumer = new CountingConsumer(KafkaPrinter.getDelimitedProtoBufOutputter(capturedRecords, Map.of(0, + CodedOutputStream.newInstance(baos)), false)); while (wrappedConsumer.count < expectedMessageCount) { KafkaPrinter.processNextChunkOfKafkaEvents(kafkaConsumer, wrappedConsumer); } @@ -117,20 +157,29 @@ private void validateNumberOfTrafficStreamsEmitted(int expectedNumProtobufObject } private org.apache.kafka.clients.consumer.Consumer - makeKafkaConsumer(int numTrafficStreams, Supplier numReadGenerator) + makeKafkaConsumer(Map partitionIdToNumTrafficStreams, Supplier numReadGenerator) throws Exception { var mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST); - var topicPartition = new TopicPartition(TEST_TOPIC_NAME, 0); - var tpList = List.of(topicPartition); + var tpList = new ArrayList(); + var offsetMap = new HashMap (); + for (int partitionId : partitionIdToNumTrafficStreams.keySet()) { + var topicPartition = new TopicPartition(TEST_TOPIC_NAME, partitionId); + tpList.add(topicPartition); + offsetMap.put(topicPartition, 0L); + } mockConsumer.assign(tpList); - mockConsumer.updateBeginningOffsets(Map.of(topicPartition, 0L)); - for (int i=0; i partitionEntry : partitionIdToNumTrafficStreams.entrySet()) { + var partitionId = partitionEntry.getKey(); + var numTrafficStreams = partitionEntry.getValue(); + for (int i = 0; i < numTrafficStreams; ++i) { + var payload = ("" + (char) ('A' + (char) i)).repeat(10); + var data = makeTrafficStreamBytes(Instant.now(), payload, numReadGenerator.get()); + var record = new ConsumerRecord(TEST_TOPIC_NAME, partitionId, 1 + i, Instant.now().toString(), data); + log.trace("adding record"); + mockConsumer.addRecord(record); + } } return mockConsumer; } diff --git a/deployment/cdk/opensearch-service-migration/lib/migration-assistance-stack.ts b/deployment/cdk/opensearch-service-migration/lib/migration-assistance-stack.ts index 3a2870b72..d6442346b 100644 --- a/deployment/cdk/opensearch-service-migration/lib/migration-assistance-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/migration-assistance-stack.ts @@ -9,6 +9,7 @@ import {LogGroup, RetentionDays} from "aws-cdk-lib/aws-logs"; import {NamespaceType} from "aws-cdk-lib/aws-servicediscovery"; import {StringParameter} from "aws-cdk-lib/aws-ssm"; import {StreamingSourceType} from "./streaming-source-type"; +import {Bucket, BucketEncryption} from "aws-cdk-lib/aws-s3"; export interface MigrationStackProps extends StackPropsExt { readonly vpc: IVpc, @@ -181,6 +182,17 @@ export class MigrationAssistanceStack extends Stack { stringValue: serviceConnectSecurityGroup.securityGroupId }); + const artifactBucket = new Bucket(this, 'migrationArtifactsS3', { + bucketName: `migration-artifacts-${props.stage}-${this.region}`, + encryption: BucketEncryption.S3_MANAGED, + enforceSSL: true + }); + new StringParameter(this, 'SSMParameterArtifactS3Arn', { + description: 'OpenSearch migration parameter for Artifact S3 bucket ARN', + parameterName: `/migration/${props.stage}/${props.defaultDeployId}/artifactS3Arn`, + stringValue: artifactBucket.bucketArn + }); + const ecsCluster = new Cluster(this, 'migrationECSCluster', { vpc: props.vpc, clusterName: `migration-${props.stage}-ecs-cluster` diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-es-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-es-stack.ts index 49b50059d..38fcee01e 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-es-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-es-stack.ts @@ -63,7 +63,7 @@ export class CaptureProxyESStack extends MigrationServiceCore { command = props.extraArgs ? command.concat(` ${props.extraArgs}`) : command this.createService({ serviceName: "capture-proxy-es", - dockerFilePath: join(__dirname, "../../../../../", "TrafficCapture/dockerSolution/build/docker/trafficCaptureProxyServer"), + dockerDirectoryPath: join(__dirname, "../../../../../", "TrafficCapture/dockerSolution/build/docker/trafficCaptureProxyServer"), dockerImageCommand: ['/bin/sh', '-c', command.concat(" & wait -n 1")], securityGroups: securityGroups, taskRolePolicies: servicePolicies, diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-stack.ts index f0cba8dc8..91f40e44b 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-stack.ts @@ -54,7 +54,7 @@ export class CaptureProxyStack extends MigrationServiceCore { command = props.extraArgs ? command.concat(` ${props.extraArgs}`) : command this.createService({ serviceName: "capture-proxy", - dockerFilePath: join(__dirname, "../../../../../", "TrafficCapture/dockerSolution/build/docker/trafficCaptureProxyServer"), + dockerDirectoryPath: join(__dirname, "../../../../../", "TrafficCapture/dockerSolution/build/docker/trafficCaptureProxyServer"), dockerImageCommand: ['/bin/sh', '-c', command], securityGroups: securityGroups, taskRolePolicies: servicePolicies, diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/elasticsearch-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/elasticsearch-stack.ts index 5492761a2..ca1bbca88 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/elasticsearch-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/elasticsearch-stack.ts @@ -39,7 +39,7 @@ export class ElasticsearchStack extends MigrationServiceCore { this.createService({ serviceName: "elasticsearch", - dockerFilePath: join(__dirname, "../../../../../", "TrafficCapture/dockerSolution/src/main/docker/elasticsearchWithSearchGuard"), + dockerDirectoryPath: join(__dirname, "../../../../../", "TrafficCapture/dockerSolution/src/main/docker/elasticsearchWithSearchGuard"), securityGroups: securityGroups, portMappings: [servicePort], serviceConnectServices: [serviceConnectService], diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-analytics-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-analytics-stack.ts index bc6d56778..ca5cd62b0 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-analytics-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-analytics-stack.ts @@ -79,7 +79,7 @@ export class MigrationAnalyticsStack extends MigrationServiceCore { this.createService({ serviceName: `otel-collector`, - dockerFilePath: join(__dirname, "../../../../../", "TrafficCapture/dockerSolution/src/main/docker/otelCollector"), + dockerDirectoryPath: join(__dirname, "../../../../../", "TrafficCapture/dockerSolution/src/main/docker/otelCollector"), dockerImageCommand: ["--config=/etc/otel-config-aws.yaml"], securityGroups: securityGroups, taskCpuUnits: 1024, diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts index b9334d4f6..16013cb40 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts @@ -7,7 +7,6 @@ import {MigrationServiceCore} from "./migration-service-core"; import {StringParameter} from "aws-cdk-lib/aws-ssm"; import {Effect, PolicyStatement} from "aws-cdk-lib/aws-iam"; import { - createMSKConsumerIAMPolicies, createOpenSearchIAMAccessPolicy, createOpenSearchServerlessIAMAccessPolicy } from "../common-utilities"; @@ -99,13 +98,25 @@ export class MigrationConsoleStack extends MigrationServiceCore { "ecs:UpdateService" ] }) + + const artifactS3Arn = StringParameter.valueForStringParameter(this, `/migration/${props.stage}/${props.defaultDeployId}/artifactS3Arn`) + const artifactS3AnyObjectPath = `${artifactS3Arn}/*` + const artifactS3PublishPolicy = new PolicyStatement({ + effect: Effect.ALLOW, + resources: [artifactS3AnyObjectPath], + actions: [ + "s3:PutObject" + ] + }) + const environment: { [key: string]: string; } = { "MIGRATION_DOMAIN_ENDPOINT": osClusterEndpoint, - "MIGRATION_KAFKA_BROKER_ENDPOINTS": brokerEndpoints + "MIGRATION_KAFKA_BROKER_ENDPOINTS": brokerEndpoints, + "MIGRATION_STAGE": props.stage } const openSearchPolicy = createOpenSearchIAMAccessPolicy(this.region, this.account) const openSearchServerlessPolicy = createOpenSearchServerlessIAMAccessPolicy(this.region, this.account) - let servicePolicies = [replayerOutputMountPolicy, openSearchPolicy, openSearchServerlessPolicy, updateReplayerServicePolicy] + let servicePolicies = [replayerOutputMountPolicy, openSearchPolicy, openSearchServerlessPolicy, updateReplayerServicePolicy, artifactS3PublishPolicy] if (props.streamingSourceType === StreamingSourceType.AWS_MSK) { const mskAdminPolicies = this.createMSKAdminIAMPolicies(props.stage, props.defaultDeployId) servicePolicies = servicePolicies.concat(mskAdminPolicies) @@ -137,7 +148,7 @@ export class MigrationConsoleStack extends MigrationServiceCore { this.createService({ serviceName: "migration-console", - dockerFilePath: join(__dirname, "../../../../../", "TrafficCapture/dockerSolution/src/main/docker/migrationConsole"), + dockerDirectoryPath: join(__dirname, "../../../../../", "TrafficCapture/dockerSolution/src/main/docker/migrationConsole"), securityGroups: securityGroups, volumes: [replayerOutputEFSVolume], mountPoints: [replayerOutputMountPoint], diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-service-core.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-service-core.ts index dd5b70cf9..1b60e79cc 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-service-core.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-service-core.ts @@ -26,6 +26,7 @@ export interface MigrationServiceCoreProps extends StackPropsExt { readonly vpc: IVpc, readonly securityGroups: ISecurityGroup[], readonly dockerFilePath?: string, + readonly dockerDirectoryPath?: string, readonly dockerImageRegistryName?: string, readonly dockerImageCommand?: string[], readonly taskRolePolicies?: PolicyStatement[], @@ -72,8 +73,8 @@ export class MigrationServiceCore extends Stack { } createService(props: MigrationServiceCoreProps) { - if ((!props.dockerFilePath && !props.dockerImageRegistryName) || (props.dockerFilePath && props.dockerImageRegistryName)) { - throw new Error(`Exactly one option [dockerFilePath, dockerImageRegistryName] is required to create the "${props.serviceName}" service`) + if ((!props.dockerDirectoryPath && !props.dockerImageRegistryName) || (props.dockerDirectoryPath && props.dockerImageRegistryName)) { + throw new Error(`Exactly one option [dockerDirectoryPath, dockerImageRegistryName] is required to create the "${props.serviceName}" service`) } const ecsCluster = Cluster.fromClusterAttributes(this, 'ecsCluster', { @@ -95,9 +96,11 @@ export class MigrationServiceCore extends Stack { } let serviceImage - if (props.dockerFilePath) { + if (props.dockerDirectoryPath) { serviceImage = ContainerImage.fromDockerImageAsset(new DockerImageAsset(this, "ServiceImage", { - directory: props.dockerFilePath + directory: props.dockerDirectoryPath, + // File path relative to above directory path + file: props.dockerFilePath })) } else { diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/traffic-replayer-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/traffic-replayer-stack.ts index fc031993f..a290b86c7 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/traffic-replayer-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/traffic-replayer-stack.ts @@ -93,7 +93,7 @@ export class TrafficReplayerStack extends MigrationServiceCore { this.createService({ serviceName: `traffic-replayer-${deployId}`, taskInstanceCount: 0, - dockerFilePath: join(__dirname, "../../../../../", "TrafficCapture/dockerSolution/build/docker/trafficReplayer"), + dockerDirectoryPath: join(__dirname, "../../../../../", "TrafficCapture/dockerSolution/build/docker/trafficReplayer"), dockerImageCommand: ['/bin/sh', '-c', replayerCommand], securityGroups: securityGroups, volumes: [replayerOutputEFSVolume], diff --git a/deployment/cdk/opensearch-service-migration/options.md b/deployment/cdk/opensearch-service-migration/options.md index 2dc0ad144..68005b7ba 100644 --- a/deployment/cdk/opensearch-service-migration/options.md +++ b/deployment/cdk/opensearch-service-migration/options.md @@ -1,5 +1,5 @@ ## Configuration Options -These tables list all CDK context configuration values a user can specify for this project +These tables list all CDK context configuration values a user can specify for this project. These will normally be added to the `cdk.context.json` file in the same directory as this markdown file. ### Migration Service Options From d3820af282e76efd30d39d586c8db2e2da63b14a Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Tue, 13 Feb 2024 11:25:36 -0800 Subject: [PATCH 2/2] [Fetch Migration] Custom exceptions and other readability improvements (#506) This PR incorporates two pieces of prior feedback: 1. The "index_operations.py" file has been renamed to "index_management.py" to eliminate confusion on whether the operations in this file involve indexing of data (they don't). 2. Custom exceptions have been created for "index_management.py" and "metadata_migration.py" to use instead of the generic "RuntimeError" class to help disambiguate errors in the call stack. This change also includes a minor bugfix to the "showFetchMigrationCommand.sh" script to move default value declaration so that command-line arguments are parsed correctly. --------- Signed-off-by: Kartik Ganesh --- FetchMigration/python/exceptions.py | 22 ++++++ FetchMigration/python/index_diff.py | 2 +- ...ndex_operations.py => index_management.py} | 43 ++++++------ FetchMigration/python/metadata_migration.py | 27 +++---- ...operations.py => test_index_management.py} | 70 +++++++++++-------- .../python/tests/test_metadata_migration.py | 53 +++++++------- .../showFetchMigrationCommand.sh | 8 +-- 7 files changed, 129 insertions(+), 96 deletions(-) create mode 100644 FetchMigration/python/exceptions.py rename FetchMigration/python/{index_operations.py => index_management.py} (81%) rename FetchMigration/python/tests/{test_index_operations.py => test_index_management.py} (85%) diff --git a/FetchMigration/python/exceptions.py b/FetchMigration/python/exceptions.py new file mode 100644 index 000000000..bd9fe17c6 --- /dev/null +++ b/FetchMigration/python/exceptions.py @@ -0,0 +1,22 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +class RequestError(RuntimeError): + def __init__(self, message=None): + super().__init__(message) + + +class IndexManagementError(RuntimeError): + def __init__(self, message=None): + super().__init__(message) + + +class MetadataMigrationError(RuntimeError): + def __init__(self, message=None): + super().__init__(message) diff --git a/FetchMigration/python/index_diff.py b/FetchMigration/python/index_diff.py index 5e980ba22..967f784d1 100644 --- a/FetchMigration/python/index_diff.py +++ b/FetchMigration/python/index_diff.py @@ -8,7 +8,7 @@ # import utils -from index_operations import SETTINGS_KEY, MAPPINGS_KEY +from index_management import SETTINGS_KEY, MAPPINGS_KEY # Computes and captures differences in indices between a "source" cluster diff --git a/FetchMigration/python/index_operations.py b/FetchMigration/python/index_management.py similarity index 81% rename from FetchMigration/python/index_operations.py rename to FetchMigration/python/index_management.py index 7afecf7c6..7bbcd58f4 100644 --- a/FetchMigration/python/index_operations.py +++ b/FetchMigration/python/index_management.py @@ -14,6 +14,7 @@ from component_template_info import ComponentTemplateInfo from endpoint_info import EndpointInfo +from exceptions import IndexManagementError, RequestError from index_doc_count import IndexDocCount from index_template_info import IndexTemplateInfo @@ -46,21 +47,21 @@ def __send_get_request(url: str, endpoint: EndpointInfo, payload: Optional[dict] timeout=__TIMEOUT_SECONDS) resp.raise_for_status() return resp - except requests.ConnectionError: - raise RuntimeError(f"ConnectionError on GET request to cluster endpoint: {endpoint.get_url()}") + except requests.ConnectionError as e: + raise RequestError(f"ConnectionError on GET request to cluster endpoint: {endpoint.get_url()}") from e except requests.HTTPError as e: - raise RuntimeError(f"HTTPError on GET request to cluster endpoint: {endpoint.get_url()} - {e!s}") - except requests.Timeout: + raise RequestError(f"HTTPError on GET request to cluster endpoint: {endpoint.get_url()}") from e + except requests.Timeout as e: # TODO retry mechanism - raise RuntimeError(f"Timed out on GET request to cluster endpoint: {endpoint.get_url()}") + raise RequestError(f"Timed out on GET request to cluster endpoint: {endpoint.get_url()}") from e except requests.exceptions.RequestException as e: - raise RuntimeError(f"GET request failure to cluster endpoint: {endpoint.get_url()} - {e!s}") + raise RequestError(f"GET request failure to cluster endpoint: {endpoint.get_url()}") from e def fetch_all_indices(endpoint: EndpointInfo) -> dict: all_indices_url: str = endpoint.add_path(__ALL_INDICES_ENDPOINT) try: - # raises RuntimeError in case of any request errors + # raises RequestError in case of any request errors resp = __send_get_request(all_indices_url, endpoint) result = dict(resp.json()) for index in list(result.keys()): @@ -74,8 +75,8 @@ def fetch_all_indices(endpoint: EndpointInfo) -> dict: if __INDEX_KEY in index_settings: index_settings[__INDEX_KEY].pop(setting, None) return result - except RuntimeError as e: - raise RuntimeError(f"Failed to fetch metadata from cluster endpoint: {e!s}") + except RequestError as e: + raise IndexManagementError("Failed to fetch metadata from cluster endpoint") from e def create_indices(indices_data: dict, endpoint: EndpointInfo) -> dict: @@ -101,7 +102,7 @@ def doc_count(indices: set, endpoint: EndpointInfo) -> IndexDocCount: count_endpoint_suffix: str = ','.join(indices) + __SEARCH_COUNT_PATH doc_count_endpoint: str = endpoint.add_path(count_endpoint_suffix) try: - # raises RuntimeError in case of any request errors + # raises RequestError in case of any request errors resp = __send_get_request(doc_count_endpoint, endpoint, __SEARCH_COUNT_PAYLOAD) result = dict(resp.json()) total: int = __TOTAL_COUNT_JSONPATH.find(result)[0].value @@ -110,13 +111,13 @@ def doc_count(indices: set, endpoint: EndpointInfo) -> IndexDocCount: for entry in counts_list: count_map[entry[__BUCKET_INDEX_NAME_KEY]] = entry[__BUCKET_DOC_COUNT_KEY] return IndexDocCount(total, count_map) - except RuntimeError as e: - raise RuntimeError(f"Failed to fetch doc_count: {e!s}") + except RequestError as e: + raise IndexManagementError("Failed to fetch doc_count") from e def __fetch_templates(endpoint: EndpointInfo, path: str, root_key: str, factory) -> set: url: str = endpoint.add_path(path) - # raises RuntimeError in case of any request errors + # raises RequestError in case of any request errors try: resp = __send_get_request(url, endpoint) result = set() @@ -124,27 +125,27 @@ def __fetch_templates(endpoint: EndpointInfo, path: str, root_key: str, factory) for template in resp.json()[root_key]: result.add(factory(template)) return result - except RuntimeError as e: + except RequestError as e: # Chain the underlying exception as a cause - raise RuntimeError("Failed to fetch template metadata from cluster endpoint") from e + raise IndexManagementError("Failed to fetch template metadata from cluster endpoint") from e def fetch_all_component_templates(endpoint: EndpointInfo) -> set[ComponentTemplateInfo]: try: - # raises RuntimeError in case of any request errors + # raises RequestError in case of any request errors return __fetch_templates(endpoint, __COMPONENT_TEMPLATES_PATH, __COMPONENT_TEMPLATE_LIST_KEY, lambda t: ComponentTemplateInfo(t)) - except RuntimeError as e: - raise RuntimeError("Failed to fetch component template metadata") from e + except IndexManagementError as e: + raise IndexManagementError("Failed to fetch component template metadata") from e def fetch_all_index_templates(endpoint: EndpointInfo) -> set[IndexTemplateInfo]: try: - # raises RuntimeError in case of any request errors + # raises RequestError in case of any request errors return __fetch_templates(endpoint, __INDEX_TEMPLATES_PATH, __INDEX_TEMPLATE_LIST_KEY, lambda t: IndexTemplateInfo(t)) - except RuntimeError as e: - raise RuntimeError("Failed to fetch index template metadata") from e + except IndexManagementError as e: + raise IndexManagementError("Failed to fetch index template metadata") from e def __create_templates(templates: set[ComponentTemplateInfo], endpoint: EndpointInfo, template_path: str) -> dict: diff --git a/FetchMigration/python/metadata_migration.py b/FetchMigration/python/metadata_migration.py index 28a4a6b6a..8b833a31b 100644 --- a/FetchMigration/python/metadata_migration.py +++ b/FetchMigration/python/metadata_migration.py @@ -13,9 +13,10 @@ import yaml import endpoint_utils -import index_operations +import index_management import utils from endpoint_info import EndpointInfo +from exceptions import MetadataMigrationError from index_diff import IndexDiff from metadata_migration_params import MetadataMigrationParams from metadata_migration_result import MetadataMigrationResult @@ -55,16 +56,16 @@ def index_metadata_migration(source: EndpointInfo, target: EndpointInfo, args: MetadataMigrationParams) -> MetadataMigrationResult: result = MetadataMigrationResult() # Fetch indices - source_indices = index_operations.fetch_all_indices(source) + source_indices = index_management.fetch_all_indices(source) # If source indices is empty, return immediately if len(source_indices.keys()) == 0: return result - target_indices = index_operations.fetch_all_indices(target) + target_indices = index_management.fetch_all_indices(target) # Compute index differences and create result object diff = IndexDiff(source_indices, target_indices) if diff.identical_indices: # Identical indices with zero documents on the target are eligible for migration - target_doc_count = index_operations.doc_count(diff.identical_indices, target) + target_doc_count = index_management.doc_count(diff.identical_indices, target) # doc_count only returns indices that have non-zero counts, so the difference in responses # gives us the set of identical, empty indices result.migration_indices = diff.identical_indices.difference(target_doc_count.index_doc_count_map.keys()) @@ -72,7 +73,7 @@ def index_metadata_migration(source: EndpointInfo, target: EndpointInfo, if diff.indices_to_create: result.migration_indices.update(diff.indices_to_create) if result.migration_indices: - doc_count_result = index_operations.doc_count(result.migration_indices, source) + doc_count_result = index_management.doc_count(result.migration_indices, source) result.target_doc_count = doc_count_result.total # Print report if args.report: @@ -82,13 +83,13 @@ def index_metadata_migration(source: EndpointInfo, target: EndpointInfo, index_data = dict() for index_name in diff.indices_to_create: index_data[index_name] = source_indices[index_name] - failed_indices = index_operations.create_indices(index_data, target) + failed_indices = index_management.create_indices(index_data, target) fail_count = len(failed_indices) if fail_count > 0: logging.error(f"Failed to create {fail_count} of {len(index_data)} indices") for failed_index_name, error in failed_indices.items(): logging.error(f"Index name {failed_index_name} failed: {error!s}") - raise RuntimeError("Metadata migration failed, index creation unsuccessful") + raise MetadataMigrationError("Metadata migration failed, index creation unsuccessful") return result @@ -109,16 +110,16 @@ def __log_template_failures(failures: dict, target_count: int) -> bool: # Raises RuntimeError if component/index template migration fails def template_migration(source: EndpointInfo, target: EndpointInfo): # Fetch and migrate component templates first - templates = index_operations.fetch_all_component_templates(source) - failures = index_operations.create_component_templates(templates, target) + templates = index_management.fetch_all_component_templates(source) + failures = index_management.create_component_templates(templates, target) if not __log_template_failures(failures, len(templates)): # Only migrate index templates if component template migration had no failures - templates = index_operations.fetch_all_index_templates(source) - failures = index_operations.create_index_templates(templates, target) + templates = index_management.fetch_all_index_templates(source) + failures = index_management.create_index_templates(templates, target) if __log_template_failures(failures, len(templates)): - raise RuntimeError("Failed to create some index templates") + raise MetadataMigrationError("Failed to create some index templates") else: - raise RuntimeError("Failed to create some component templates, aborting index template creation") + raise MetadataMigrationError("Failed to create some component templates, aborting index template creation") def run(args: MetadataMigrationParams) -> MetadataMigrationResult: diff --git a/FetchMigration/python/tests/test_index_operations.py b/FetchMigration/python/tests/test_index_management.py similarity index 85% rename from FetchMigration/python/tests/test_index_operations.py rename to FetchMigration/python/tests/test_index_management.py index fa066e279..76b750931 100644 --- a/FetchMigration/python/tests/test_index_operations.py +++ b/FetchMigration/python/tests/test_index_management.py @@ -14,9 +14,10 @@ import responses from responses import matchers -import index_operations +import index_management from component_template_info import ComponentTemplateInfo from endpoint_info import EndpointInfo +from exceptions import IndexManagementError, RequestError from index_template_info import IndexTemplateInfo from tests import test_constants @@ -29,7 +30,7 @@ def create_base_template_response(list_name: str, body: dict) -> dict: }}}]} -class TestIndexOperations(unittest.TestCase): +class TestIndexManagement(unittest.TestCase): @responses.activate def test_fetch_all_indices(self): # Set up GET response @@ -45,7 +46,7 @@ def test_fetch_all_indices(self): } responses.get(test_constants.SOURCE_ENDPOINT + "*", json=test_data) # Now send request - index_data = index_operations.fetch_all_indices(EndpointInfo(test_constants.SOURCE_ENDPOINT)) + index_data = index_management.fetch_all_indices(EndpointInfo(test_constants.SOURCE_ENDPOINT)) self.assertEqual(3, len(index_data.keys())) # Test that internal data has been filtered, but non-internal data is retained index_settings = index_data[test_constants.INDEX1_NAME][test_constants.SETTINGS_KEY] @@ -63,7 +64,7 @@ def test_create_indices(self): match=[matchers.json_params_matcher(test_data[test_constants.INDEX2_NAME])]) responses.put(test_constants.TARGET_ENDPOINT + test_constants.INDEX3_NAME, match=[matchers.json_params_matcher(test_data[test_constants.INDEX3_NAME])]) - failed = index_operations.create_indices(test_data, EndpointInfo(test_constants.TARGET_ENDPOINT)) + failed = index_management.create_indices(test_data, EndpointInfo(test_constants.TARGET_ENDPOINT)) self.assertEqual(0, len(failed)) @responses.activate @@ -78,11 +79,11 @@ def test_create_indices_empty_alias(self): responses.put(test_constants.TARGET_ENDPOINT + test_constants.INDEX1_NAME, match=[matchers.json_params_matcher(expected_payload)]) # Empty "aliases" should be stripped - failed = index_operations.create_indices(test_data, EndpointInfo(test_constants.TARGET_ENDPOINT)) + failed = index_management.create_indices(test_data, EndpointInfo(test_constants.TARGET_ENDPOINT)) self.assertEqual(0, len(failed)) # Index without "aliases" should not fail del test_data[test_constants.INDEX1_NAME][aliases_key] - failed = index_operations.create_indices(test_data, EndpointInfo(test_constants.TARGET_ENDPOINT)) + failed = index_management.create_indices(test_data, EndpointInfo(test_constants.TARGET_ENDPOINT)) self.assertEqual(0, len(failed)) @responses.activate @@ -94,7 +95,7 @@ def test_create_indices_exceptions(self): json={}) responses.put(test_constants.TARGET_ENDPOINT + test_constants.INDEX3_NAME, json={}) - failed_indices = index_operations.create_indices(test_constants.BASE_INDICES_DATA, + failed_indices = index_management.create_indices(test_constants.BASE_INDICES_DATA, EndpointInfo(test_constants.TARGET_ENDPOINT)) # Verify that failed indices are returned with their respective errors self.assertEqual(1, len(failed_indices)) @@ -114,7 +115,7 @@ def test_doc_count(self): "aggregations": {"count": {"buckets": test_buckets}}} responses.get(expected_count_endpoint, json=mock_count_response) # Now send request - doc_count_result = index_operations.doc_count(test_indices, EndpointInfo(test_constants.SOURCE_ENDPOINT)) + doc_count_result = index_management.doc_count(test_indices, EndpointInfo(test_constants.SOURCE_ENDPOINT)) self.assertEqual(total_docs, doc_count_result.total) @responses.activate @@ -122,34 +123,43 @@ def test_doc_count_error(self): test_indices = {test_constants.INDEX1_NAME, test_constants.INDEX2_NAME} expected_count_endpoint = test_constants.SOURCE_ENDPOINT + ",".join(test_indices) + "/_search" responses.get(expected_count_endpoint, body=requests.Timeout()) - self.assertRaises(RuntimeError, index_operations.doc_count, test_indices, - EndpointInfo(test_constants.SOURCE_ENDPOINT)) + try: + index_management.doc_count(test_indices, EndpointInfo(test_constants.SOURCE_ENDPOINT)) + # test should not reach this line + self.fail("Expected exception not thrown") + except IndexManagementError as e: + self.assertIsNotNone(e.__cause__) + self.assertTrue(isinstance(e.__cause__, RequestError)) @responses.activate def test_get_request_errors(self): # Set up list of error types to test test_errors = [requests.ConnectionError(), requests.HTTPError(), requests.Timeout(), requests.exceptions.MissingSchema()] - # Verify that each error is wrapped in a RuntimeError + # Verify that each error is wrapped in a IndexManagementError, then a RequestError for e in test_errors: responses.get(test_constants.SOURCE_ENDPOINT + "*", body=e) - self.assertRaises(RuntimeError, index_operations.fetch_all_indices, - EndpointInfo(test_constants.SOURCE_ENDPOINT)) + try: + index_management.fetch_all_indices(EndpointInfo(test_constants.SOURCE_ENDPOINT)) + except IndexManagementError as ime: + self.assertIsNotNone(ime.__cause__) + self.assertTrue(isinstance(ime.__cause__, RequestError)) + self.assertIsNotNone(ime.__cause__.__cause__) @responses.activate def test_fetch_all_component_templates_empty(self): # 1 - Empty response responses.get(test_constants.SOURCE_ENDPOINT + "_component_template", json={}) - result = index_operations.fetch_all_component_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT)) + result = index_management.fetch_all_component_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT)) # Missing key returns empty result self.assertEqual(0, len(result)) # 2 - Valid response structure but no templates responses.get(test_constants.SOURCE_ENDPOINT + "_component_template", json={"component_templates": []}) - result = index_operations.fetch_all_component_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT)) + result = index_management.fetch_all_component_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT)) self.assertEqual(0, len(result)) # 2 - Invalid response structure responses.get(test_constants.SOURCE_ENDPOINT + "_component_template", json={"templates": []}) - result = index_operations.fetch_all_component_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT)) + result = index_management.fetch_all_component_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT)) self.assertEqual(0, len(result)) @responses.activate @@ -158,7 +168,7 @@ def test_fetch_all_component_templates(self): test_index = test_constants.BASE_INDICES_DATA[test_constants.INDEX3_NAME] test_resp = create_base_template_response("component_templates", test_index) responses.get(test_constants.SOURCE_ENDPOINT + "_component_template", json=test_resp) - result = index_operations.fetch_all_component_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT)) + result = index_management.fetch_all_component_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT)) # Result should contain one template self.assertEqual(1, len(result)) template = result.pop() @@ -172,16 +182,16 @@ def test_fetch_all_component_templates(self): def test_fetch_all_index_templates_empty(self): # 1 - Empty response responses.get(test_constants.SOURCE_ENDPOINT + "_index_template", json={}) - result = index_operations.fetch_all_index_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT)) + result = index_management.fetch_all_index_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT)) # Missing key returns empty result self.assertEqual(0, len(result)) # 2 - Valid response structure but no templates responses.get(test_constants.SOURCE_ENDPOINT + "_index_template", json={"index_templates": []}) - result = index_operations.fetch_all_index_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT)) + result = index_management.fetch_all_index_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT)) self.assertEqual(0, len(result)) # 2 - Invalid response structure responses.get(test_constants.SOURCE_ENDPOINT + "_index_template", json={"templates": []}) - result = index_operations.fetch_all_index_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT)) + result = index_management.fetch_all_index_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT)) self.assertEqual(0, len(result)) @responses.activate @@ -197,7 +207,7 @@ def test_fetch_all_index_templates(self): template_body["index_patterns"] = [test_index_pattern] template_body["composed_of"] = [test_component_template_name] responses.get(test_constants.SOURCE_ENDPOINT + "_index_template", json=test_resp) - result = index_operations.fetch_all_index_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT)) + result = index_management.fetch_all_index_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT)) # Result should contain one template self.assertEqual(1, len(result)) template = result.pop() @@ -213,14 +223,12 @@ def test_fetch_all_templates_errors(self): responses.get(test_constants.SOURCE_ENDPOINT + "_component_template", body=requests.Timeout()) responses.get(test_constants.SOURCE_ENDPOINT + "_index_template", body=requests.HTTPError()) try: - self.assertRaises(RuntimeError, index_operations.fetch_all_component_templates, - EndpointInfo(test_constants.SOURCE_ENDPOINT)) - except RuntimeError as e: + index_management.fetch_all_component_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT)) + except IndexManagementError as e: self.assertIsNotNone(e.__cause__) try: - self.assertRaises(RuntimeError, index_operations.fetch_all_index_templates, - EndpointInfo(test_constants.SOURCE_ENDPOINT)) - except RuntimeError as e: + index_management.fetch_all_index_templates(EndpointInfo(test_constants.SOURCE_ENDPOINT)) + except IndexManagementError as e: self.assertIsNotNone(e.__cause__) @responses.activate @@ -240,7 +248,7 @@ def test_create_templates(self): match=[matchers.json_params_matcher(test1_template_def)]) responses.put(test_constants.TARGET_ENDPOINT + "_component_template/test2", match=[matchers.json_params_matcher(test2_template_def)]) - failed = index_operations.create_component_templates(test_templates, + failed = index_management.create_component_templates(test_templates, EndpointInfo(test_constants.TARGET_ENDPOINT)) self.assertEqual(0, len(failed)) # Also test index templates @@ -252,7 +260,7 @@ def test_create_templates(self): match=[matchers.json_params_matcher(test1_template_def)]) responses.put(test_constants.TARGET_ENDPOINT + "_index_template/test2", match=[matchers.json_params_matcher(test2_template_def)]) - failed = index_operations.create_index_templates(test_templates, EndpointInfo(test_constants.TARGET_ENDPOINT)) + failed = index_management.create_index_templates(test_templates, EndpointInfo(test_constants.TARGET_ENDPOINT)) self.assertEqual(0, len(failed)) @responses.activate @@ -261,13 +269,13 @@ def test_create_templates_failure(self): responses.put(test_constants.TARGET_ENDPOINT + "_component_template/test1", body=requests.Timeout()) responses.put(test_constants.TARGET_ENDPOINT + "_index_template/test2", body=requests.HTTPError()) test_input = ComponentTemplateInfo({"name": "test1", "component_template": {}}) - failed = index_operations.create_component_templates({test_input}, EndpointInfo(test_constants.TARGET_ENDPOINT)) + failed = index_management.create_component_templates({test_input}, EndpointInfo(test_constants.TARGET_ENDPOINT)) # Verify that failures return their respective errors self.assertEqual(1, len(failed)) self.assertTrue("test1" in failed) self.assertTrue(isinstance(failed["test1"], requests.Timeout)) test_input = IndexTemplateInfo({"name": "test2", "index_template": {}}) - failed = index_operations.create_index_templates({test_input}, EndpointInfo(test_constants.TARGET_ENDPOINT)) + failed = index_management.create_index_templates({test_input}, EndpointInfo(test_constants.TARGET_ENDPOINT)) # Verify that failures return their respective errors self.assertEqual(1, len(failed)) self.assertTrue("test2" in failed) diff --git a/FetchMigration/python/tests/test_metadata_migration.py b/FetchMigration/python/tests/test_metadata_migration.py index 5d5337181..02e276e98 100644 --- a/FetchMigration/python/tests/test_metadata_migration.py +++ b/FetchMigration/python/tests/test_metadata_migration.py @@ -17,6 +17,7 @@ import metadata_migration from endpoint_info import EndpointInfo +from exceptions import MetadataMigrationError from index_doc_count import IndexDocCount from metadata_migration_params import MetadataMigrationParams from tests import test_constants @@ -33,11 +34,11 @@ def tearDown(self) -> None: logging.disable(logging.NOTSET) @patch('metadata_migration.template_migration') - @patch('index_operations.doc_count') + @patch('index_management.doc_count') @patch('metadata_migration.write_output') @patch('metadata_migration.print_report') - @patch('index_operations.create_indices') - @patch('index_operations.fetch_all_indices') + @patch('index_management.create_indices') + @patch('index_management.fetch_all_indices') # Note that mock objects are passed bottom-up from the patch order above def test_run_report(self, mock_fetch_indices: MagicMock, mock_create_indices: MagicMock, mock_print_report: MagicMock, mock_write_output: MagicMock, mock_doc_count: MagicMock, @@ -63,10 +64,10 @@ def test_run_report(self, mock_fetch_indices: MagicMock, mock_create_indices: Ma mock_write_output.assert_not_called() mock_template_migration.assert_called_once() - @patch('index_operations.doc_count') + @patch('index_management.doc_count') @patch('metadata_migration.print_report') @patch('metadata_migration.write_output') - @patch('index_operations.fetch_all_indices') + @patch('index_management.fetch_all_indices') # Note that mock objects are passed bottom-up from the patch order above def test_run_dryrun(self, mock_fetch_indices: MagicMock, mock_write_output: MagicMock, mock_print_report: MagicMock, mock_doc_count: MagicMock): @@ -88,10 +89,10 @@ def test_run_dryrun(self, mock_fetch_indices: MagicMock, mock_write_output: Magi # Report should not be printed mock_print_report.assert_not_called() - @patch('index_operations.doc_count') + @patch('index_management.doc_count') @patch('metadata_migration.print_report') @patch('metadata_migration.write_output') - @patch('index_operations.fetch_all_indices') + @patch('index_management.fetch_all_indices') # Note that mock objects are passed bottom-up from the patch order above def test_identical_empty_index(self, mock_fetch_indices: MagicMock, mock_write_output: MagicMock, mock_print_report: MagicMock, mock_doc_count: MagicMock): @@ -147,7 +148,7 @@ def test_missing_output_file_non_report(self): self.assertRaises(ValueError, metadata_migration.run, test_input) @patch('metadata_migration.template_migration') - @patch('index_operations.fetch_all_indices') + @patch('index_management.fetch_all_indices') # Note that mock objects are passed bottom-up from the patch order above def test_no_indices_in_source(self, mock_fetch_indices: MagicMock, mock_template_migration: MagicMock): mock_fetch_indices.return_value = {} @@ -161,9 +162,9 @@ def test_no_indices_in_source(self, mock_fetch_indices: MagicMock, mock_template @patch('metadata_migration.write_output') @patch('metadata_migration.template_migration') - @patch('index_operations.doc_count') - @patch('index_operations.create_indices') - @patch('index_operations.fetch_all_indices') + @patch('index_management.doc_count') + @patch('index_management.create_indices') + @patch('index_management.fetch_all_indices') # Note that mock objects are passed bottom-up from the patch order above def test_failed_indices(self, mock_fetch_indices: MagicMock, mock_create_indices: MagicMock, mock_doc_count: MagicMock, mock_template_migration: MagicMock, @@ -179,14 +180,14 @@ def test_failed_indices(self, mock_fetch_indices: MagicMock, mock_create_indices # Fetch indices is called first for source, then for target mock_fetch_indices.side_effect = [test_constants.BASE_INDICES_DATA, {}] test_input = MetadataMigrationParams(test_constants.PIPELINE_CONFIG_RAW_FILE_PATH, "dummy") - self.assertRaises(RuntimeError, metadata_migration.run, test_input) + self.assertRaises(MetadataMigrationError, metadata_migration.run, test_input) mock_create_indices.assert_called_once_with(test_constants.BASE_INDICES_DATA, ANY) mock_template_migration.assert_not_called() - @patch('index_operations.create_index_templates') - @patch('index_operations.fetch_all_index_templates') - @patch('index_operations.create_component_templates') - @patch('index_operations.fetch_all_component_templates') + @patch('index_management.create_index_templates') + @patch('index_management.fetch_all_index_templates') + @patch('index_management.create_component_templates') + @patch('index_management.fetch_all_component_templates') # Note that mock objects are passed bottom-up from the patch order above def test_template_migration(self, fetch_component: MagicMock, create_component: MagicMock, fetch_index: MagicMock, create_index: MagicMock): @@ -203,10 +204,10 @@ def test_template_migration(self, fetch_component: MagicMock, create_component: create_component.assert_called_once_with(ANY, target) create_index.assert_called_once_with(ANY, target) - @patch('index_operations.create_index_templates') - @patch('index_operations.fetch_all_index_templates') - @patch('index_operations.create_component_templates') - @patch('index_operations.fetch_all_component_templates') + @patch('index_management.create_index_templates') + @patch('index_management.fetch_all_index_templates') + @patch('index_management.create_component_templates') + @patch('index_management.fetch_all_component_templates') # Note that mock objects are passed bottom-up from the patch order above def test_component_template_migration_failed(self, fetch_component: MagicMock, create_component: MagicMock, fetch_index: MagicMock, create_index: MagicMock): @@ -215,17 +216,17 @@ def test_component_template_migration_failed(self, fetch_component: MagicMock, c # Create component index call returns a failure create_component.return_value = {"test-template": requests.Timeout()} # Expect the migration to throw RuntimeError - self.assertRaises(RuntimeError, metadata_migration.template_migration, source, target) + self.assertRaises(MetadataMigrationError, metadata_migration.template_migration, source, target) fetch_component.assert_called_once_with(source) create_component.assert_called_once_with(ANY, target) # Index migration should never occur fetch_index.assert_not_called() create_index.assert_not_called() - @patch('index_operations.create_index_templates') - @patch('index_operations.fetch_all_index_templates') - @patch('index_operations.create_component_templates') - @patch('index_operations.fetch_all_component_templates') + @patch('index_management.create_index_templates') + @patch('index_management.fetch_all_index_templates') + @patch('index_management.create_component_templates') + @patch('index_management.fetch_all_component_templates') # Note that mock objects are passed bottom-up from the patch order above def test_index_template_migration_failed(self, fetch_component: MagicMock, create_component: MagicMock, fetch_index: MagicMock, create_index: MagicMock): @@ -234,7 +235,7 @@ def test_index_template_migration_failed(self, fetch_component: MagicMock, creat # Create component index call returns a failure create_index.return_value = {"test-template": requests.Timeout()} # Expect the migration to throw RuntimeError - self.assertRaises(RuntimeError, metadata_migration.template_migration, source, target) + self.assertRaises(MetadataMigrationError, metadata_migration.template_migration, source, target) # All mocks should be invoked fetch_component.assert_called_once_with(source) fetch_index.assert_called_once_with(source) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/showFetchMigrationCommand.sh b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/showFetchMigrationCommand.sh index 5b208320c..a6700496b 100755 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/showFetchMigrationCommand.sh +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/showFetchMigrationCommand.sh @@ -14,6 +14,10 @@ usage() { exit 1 } +# Default values +create_only=false +dry_run=false + while [[ $# -gt 0 ]]; do key="$1" case $key in @@ -47,10 +51,6 @@ fi # ECS command overrides argument with placeholder for flags OVERRIDES_ARG="--overrides '{ \"containerOverrides\": [ { \"name\": \"fetch-migration\", \"command\": }]}'" -# Default values -create_only=false -dry_run=false - # Build flags string flags="" if [ "$dry_run" = true ]; then