Skip to content

Commit

Permalink
Add support for Kafka commands in console lib (#775)
Browse files Browse the repository at this point in the history
Add basic Kafka command structure to console library

root@0abbc4d082d3:~# console kafka describe-topic-records
TOPIC                          PARTITION  RECORDS   
logging-traffic-topic          0          5         

---------

Signed-off-by: Tanner Lewis <[email protected]>
  • Loading branch information
lewijacn authored Jun 26, 2024
1 parent 8c323d7 commit 99a2575
Show file tree
Hide file tree
Showing 11 changed files with 299 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ services:
condition: service_started
opensearchtarget:
condition: service_started
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id default-logging-group --otelCollectorEndpoint http://otel-collector:4317"
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317"

opensearchtarget:
image: 'opensearchproject/opensearch:latest'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,59 @@ def get_metrics_data_cmd(ctx, component, metric_name, statistic, lookback):
metric_data
)

# ##################### KAFKA ###################


@cli.group(name="kafka")
@click.pass_obj
def kafka_group(ctx):
"""All actions related to Kafka operations"""
if ctx.env.kafka is None:
raise click.UsageError("Kafka is not set")


@kafka_group.command(name="create-topic")
@click.option('--topic-name', default="logging-traffic-topic", help='Specify a topic name to create')
@click.pass_obj
def create_topic_cmd(ctx, topic_name):
result = ctx.env.kafka.create_topic(topic_name=topic_name)
click.echo(result.value)


@kafka_group.command(name="delete-topic")
@click.option("--acknowledge-risk", is_flag=True, show_default=True, default=False,
help="Flag to acknowledge risk and skip confirmation")
@click.option('--topic-name', default="logging-traffic-topic", help='Specify a topic name to delete')
@click.pass_obj
def delete_topic_cmd(ctx, acknowledge_risk, topic_name):
if acknowledge_risk:
result = ctx.env.kafka.delete_topic(topic_name=topic_name)
click.echo(result.value)
else:
if click.confirm('Deleting a topic will irreversibly delete all captured traffic records stored in that '
'topic. Are you sure you want to continue?'):
click.echo(f"Performing delete topic operation on {topic_name} topic...")
result = ctx.env.kafka.delete_topic(topic_name=topic_name)
click.echo(result.value)
else:
click.echo("Aborting command.")


@kafka_group.command(name="describe-consumer-group")
@click.option('--group-name', default="logging-group-default", help='Specify a group name to describe')
@click.pass_obj
def describe_group_command(ctx, group_name):
result = ctx.env.kafka.describe_consumer_group(group_name=group_name)
click.echo(result.value)


@kafka_group.command(name="describe-topic-records")
@click.option('--topic-name', default="logging-traffic-topic", help='Specify a topic name to describe')
@click.pass_obj
def describe_topic_records_cmd(ctx, topic_name):
result = ctx.env.kafka.describe_topic_records(topic_name=topic_name)
click.echo(result.value)

# ##################### UTILITIES ###################


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from console_link.models.snapshot import FileSystemSnapshot, Snapshot, S3Snapshot
from console_link.models.replayer_base import Replayer
from console_link.models.replayer_ecs import ECSReplayer
from console_link.models.kafka import Kafka, MSK, StandardKafka
import yaml
from cerberus import Validator

Expand All @@ -28,14 +29,21 @@ def get_replayer(config: Dict):
raise ValueError("Invalid replayer config")


def get_kafka(config: Dict):
if 'msk' in config:
return MSK(config)
return StandardKafka(config)


SCHEMA = {
"source_cluster": {"type": "dict", "required": False},
"target_cluster": {"type": "dict", "required": True},
"backfill": {"type": "dict", "required": False},
"metrics_source": {"type": "dict", "required": False},
"snapshot": {"type": "dict", "required": False},
"metadata_migration": {"type": "dict", "required": False},
"replay": {"type": "dict", "required": False}
"replay": {"type": "dict", "required": False},
"kafka": {"type": "dict", "required": False}
}


Expand All @@ -47,6 +55,7 @@ class Environment:
snapshot: Optional[Snapshot] = None
metadata: Optional[Metadata] = None
replay: Optional[Replayer] = None
kafka: Optional[Kafka] = None

def __init__(self, config_file: str):
logger.info(f"Loading config file: {config_file}")
Expand Down Expand Up @@ -100,3 +109,6 @@ def __init__(self, config_file: str):
self.metadata: Metadata = Metadata(self.config["metadata_migration"],
target_cluster=self.target_cluster,
snapshot=self.snapshot)
if 'kafka' in self.config:
self.kafka: Kafka = get_kafka(self.config["kafka"])
logger.info(f"Kafka initialized: {self.kafka}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import logging
from console_link.models.kafka import Kafka
from console_link.models.command_result import CommandResult

logger = logging.getLogger(__name__)


def create_topic(kafka: Kafka, topic_name: str) -> CommandResult:
result = kafka.create_topic(topic_name=topic_name)
return result


def delete_topic(kafka: Kafka, topic_name: str) -> CommandResult:
result = kafka.delete_topic(topic_name=topic_name)
return result


def describe_consumer_group(kafka: Kafka, group_name: str) -> CommandResult:
result = kafka.describe_consumer_group(group_name=group_name)
return result


def describe_topic_records(kafka: Kafka, topic_name: str) -> CommandResult:
result = kafka.delete_topic(topic_name=topic_name)
return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import subprocess
from typing import List

from cerberus import Validator
import logging
from abc import ABC, abstractmethod
from console_link.models.command_result import CommandResult
from console_link.models.schema_tools import contains_one_of

logger = logging.getLogger(__name__)

MSK_SCHEMA = {
"nullable": True,
}

STANDARD_SCHEMA = {
"nullable": True,
}

SCHEMA = {
'kafka': {
'type': 'dict',
'schema': {
'broker_endpoints': {"type": "string", "required": True},
'msk': MSK_SCHEMA,
'standard': STANDARD_SCHEMA
},
'check_with': contains_one_of({'msk', 'standard'})
}
}


def get_result_for_command(command: List[str], operation_name: str) -> CommandResult:
try:
cmd_output = subprocess.run(command, capture_output=True, text=True, check=True)
output = cmd_output.stdout
message = f"{operation_name} command completed successfully"
logger.info(message)
if not output:
output = f"Command for {operation_name} completed successfully.\n"
return CommandResult(success=True, value=output)
except subprocess.CalledProcessError as e:
message = f"Failed to perform {operation_name} command: {str(e)} Standard Error Output: {e.stderr}"
logger.info(message)
output = e.stdout
return CommandResult(success=False, value=output)


def pretty_print_kafka_record_count(data: str) -> str:
# Split the data into lines
lines = data.split("\n")

# Define headers
headers = ["TOPIC", "PARTITION", "RECORDS"]

# Initialize the formatted output with headers
formatted_output = "{:<30} {:<10} {:<10}".format(*headers) + "\n"

# Format each line of data
for line in lines:
if line and line.count(":") == 2:
topic, partition, records = line.split(":")
formatted_output += "{:<30} {:<10} {:<10}".format(topic, partition, records) + "\n"
return formatted_output


class Kafka(ABC):
"""
Interface for Kafka command line operations
"""

def __init__(self, config):
logger.info(f"Initializing Kafka with config: {config}")
v = Validator(SCHEMA)
if not v.validate({'kafka': config}):
logger.error(f"Invalid config: {v.errors}")
raise ValueError(v.errors)
self.brokers = config.get('broker_endpoints')

@abstractmethod
def delete_topic(self, topic_name='logging-traffic-topic') -> CommandResult:
pass

@abstractmethod
def create_topic(self, topic_name='logging-traffic-topic') -> CommandResult:
pass

@abstractmethod
def describe_consumer_group(self, group_name='logging-group-default') -> CommandResult:
pass

@abstractmethod
def describe_topic_records(self, topic_name='logging-traffic-topic') -> CommandResult:
pass


class MSK(Kafka):
"""
AWS MSK implementation of Kafka operations
"""

def __init__(self, config):
super().__init__(config)

def delete_topic(self, topic_name='logging-traffic-topic') -> CommandResult:
command = ['/root/kafka-tools/kafka/bin/kafka-topics.sh', '--bootstrap-server', f'{self.brokers}', '--delete',
'--topic', f'{topic_name}', '--command-config', '/root/kafka-tools/aws/msk-iam-auth.properties']
logger.info(f"Executing command: {command}")
return get_result_for_command(command, "Delete Topic")

def create_topic(self, topic_name='logging-traffic-topic') -> CommandResult:
command = ['/root/kafka-tools/kafka/bin/kafka-topics.sh', '--bootstrap-server', f'{self.brokers}', '--create',
'--topic', f'{topic_name}', '--command-config', '/root/kafka-tools/aws/msk-iam-auth.properties']
logger.info(f"Executing command: {command}")
return get_result_for_command(command, "Create Topic")

def describe_consumer_group(self, group_name='logging-group-default') -> CommandResult:
command = ['/root/kafka-tools/kafka/bin/kafka-consumer-groups.sh', '--bootstrap-server', f'{self.brokers}',
'--timeout', '100000', '--describe', '--group', f'{group_name}', '--command-config',
'/root/kafka-tools/aws/msk-iam-auth.properties']
logger.info(f"Executing command: {command}")
return get_result_for_command(command, "Describe Consumer Group")

def describe_topic_records(self, topic_name='logging-traffic-topic') -> CommandResult:
command = ['/root/kafka-tools/kafka/bin/kafka-run-class.sh', 'kafka.tools.GetOffsetShell', '--broker-list',
f'{self.brokers}', '--topic', f'{topic_name}', '--time', '-1', '--command-config',
'/root/kafka-tools/aws/msk-iam-auth.properties']
logger.info(f"Executing command: {command}")
result = get_result_for_command(command, "Describe Topic Records")
if result.success and result.value:
pretty_value = pretty_print_kafka_record_count(result.value)
return CommandResult(success=result.success, value=pretty_value)
return result


class StandardKafka(Kafka):
"""
Standard Kafka distribution implementation of Kafka operations
"""

def __init__(self, config):
super().__init__(config)

def delete_topic(self, topic_name='logging-traffic-topic') -> CommandResult:
command = ['/root/kafka-tools/kafka/bin/kafka-topics.sh', '--bootstrap-server', f'{self.brokers}', '--delete',
'--topic', f'{topic_name}']
logger.info(f"Executing command: {command}")
return get_result_for_command(command, "Delete Topic")

def create_topic(self, topic_name='logging-traffic-topic') -> CommandResult:
command = ['/root/kafka-tools/kafka/bin/kafka-topics.sh', '--bootstrap-server', f'{self.brokers}', '--create',
'--topic', f'{topic_name}']
logger.info(f"Executing command: {command}")
return get_result_for_command(command, "Create Topic")

def describe_consumer_group(self, group_name='logging-group-default') -> CommandResult:
command = ['/root/kafka-tools/kafka/bin/kafka-consumer-groups.sh', '--bootstrap-server', f'{self.brokers}',
'--timeout', '100000', '--describe', '--group', f'{group_name}']
logger.info(f"Executing command: {command}")
return get_result_for_command(command, "Describe Consumer Group")

def describe_topic_records(self, topic_name='logging-traffic-topic') -> CommandResult:
command = ['/root/kafka-tools/kafka/bin/kafka-run-class.sh', 'kafka.tools.GetOffsetShell', '--broker-list',
f'{self.brokers}', '--topic', f'{topic_name}', '--time', '-1']
logger.info(f"Executing command: {command}")
result = get_result_for_command(command, "Describe Topic Records")
if result.success and result.value:
pretty_value = pretty_print_kafka_record_count(result.value)
return CommandResult(success=result.success, value=pretty_value)
return result
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ snapshot:
metadata_migration:
from_snapshot:
min_replicas: 0
kafka:
broker_endpoints: "kafka:9092"
standard: ""
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ export class MetadataMigrationYaml {
min_replicas: number = 1;
}

export class MSKYaml {
}

export class StandardKafkaYaml {
}

export class KafkaYaml {
broker_endpoints: string = '';
msk?: string | null;
standard?: string | null;
}

export class ServicesYaml {
source_cluster: ClusterYaml;
target_cluster: ClusterYaml;
Expand All @@ -98,6 +110,7 @@ export class ServicesYaml {
snapshot?: SnapshotYaml;
metadata_migration?: MetadataMigrationYaml;
replayer?: ECSReplayerYaml;
kafka?: KafkaYaml;

stringify(): string {
return yaml.stringify({
Expand All @@ -107,7 +120,8 @@ export class ServicesYaml {
backfill: this.backfill?.toDict(),
snapshot: this.snapshot?.toDict(),
metadata_migration: this.metadata_migration,
replay: this.replayer?.toDict()
replay: this.replayer?.toDict(),
kafka: this.kafka
},
{
'nullStr': ''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {Runtime} from "aws-cdk-lib/aws-lambda";
import {Provider} from "aws-cdk-lib/custom-resources";
import * as path from "path";
import { createMigrationStringParameter, getMigrationStringParameterValue, MigrationSSMParameter } from "./common-utilities";
import {KafkaYaml} from "./migration-services-yaml";

export interface MskUtilityStackProps extends StackPropsExt {
readonly vpc: IVpc,
Expand All @@ -20,6 +21,7 @@ export interface MskUtilityStackProps extends StackPropsExt {
* a consistent ORDERED fashion.
*/
export class MSKUtilityStack extends Stack {
kafkaYaml: KafkaYaml;

constructor(scope: Construct, id: string, props: MskUtilityStackProps) {
super(scope, id, props);
Expand Down Expand Up @@ -152,5 +154,8 @@ export class MSKUtilityStack extends Stack {
...props,
parameter: MigrationSSMParameter.KAFKA_BROKERS
});
this.kafkaYaml = new KafkaYaml();
this.kafkaYaml.msk = '';
this.kafkaYaml.broker_endpoints = brokerEndpoints;
}
}
}
Loading

0 comments on commit 99a2575

Please sign in to comment.