Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist demoted and removed brokers #2115

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions config/cruisecontrol.properties
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,83 @@ num.concurrent.leader.movements=1000
execution.progress.check.interval.ms=10000


# Configuration for persistent data
# =======================================

# The method to use to store persisted data. This is the first "persisted.data" config to set which
# will determine which other configs of the series should be configured. The available options are:
# "kafka",
# "memory"
# The default is "memory", which doesn't durably persist any runtime data.
persisted.data.persist.method=memory

# The name of the kafka topic to use to persist data when "persisted.data.persist.method=kafka".
# If the topic is not present, then it will be created.
persisted.data.kafka.topic.name=__CruiseControlPersistentData

# The number of partitions to ensure are present for the kafka topic. Only applies when
# "persisted.data.persist.method=kafka". If the topic has fewer than this number of partitions,
# then partitions will be added.
persisted.data.kafka.topic.partition.count=1

# The replication factor to use for the kafka topic. Only applies when
# "persisted.data.persist.method=kafka". Multiple partition replicas are desirable to ensure the
# topic is reasonably available.
persisted.data.kafka.topic.replication.factor=3

# The configs to apply to the kafka topic used to persist Cruise Control data. Only applies if
# "persisted.data.persist.method=kafka". All valid topic config keys and values can be used to
# apply to the "persisted.data.kafka.topic.name" topic. Use this prefix string with the topic
# config keys.
# e.g. persisted.data.kafka.topic.config.min.insync.replicas=2
# See: <a# href="https://kafka.apache.org/documentation/#topicconfigs">Kafka topic config
# documentation</a>.
#
# persisted.data.kafka.topic.config.K=V
persisted.data.kafka.topic.config.cleanup.policy=compact

# The configs to use when creating the kafka producer to persist Cruise Control data. Only
# applies if "persisted.data.persist.method=kafka". The keys and values need to be valid Kafka
# Producer configs. All valid <a
# href="https://kafka.apache.org/documentation/#producerconfigs">Kafka producer config</a> keys
# can be prefixed, and they will be applied to the producer.
# e.g. persisted.data.kafka.producer.config.compression.type=gzip
#
# persisted.data.kafka.producer.config.K=V
persisted.data.kafka.producer.config.acks=all
persisted.data.kafka.producer.config.buffer.memory=2000000
persisted.data.kafka.producer.config.client.id=kafka-cruise-control.kafka-persisted-map.producer
persisted.data.kafka.producer.config.compression.type=gzip
persisted.data.kafka.producer.config.delivery.timeout.ms=3600000
persisted.data.kafka.producer.config.key.serializer=StringSerializer
persisted.data.kafka.producer.config.max.in.flight.requests.per.connection=2
persisted.data.kafka.producer.config.max.request.size=1000000
persisted.data.kafka.producer.config.reconnect.backoff.ms=1000
persisted.data.kafka.producer.config.retries=2147483647
persisted.data.kafka.producer.config.retry.backoff.ms=1000
persisted.data.kafka.producer.config.value.serializer=StringSerializer

# The configs to use when creating the kafka consumer to read persisted Cruise Control data.
# Only applies if "persisted.data.persist.method=kafka". The keys and values need to be valid
# Kafka Consumer configs. All valid <a
# href="https://kafka.apache.org/documentation/#consumerconfigs">Kafka consumer config</a> keys
# can be prefixed, and they will be applied to the consumer.
# e.g. persisted.data.kafka.consumer.config.max.poll.records=100
#
# persisted.data.kafka.consumer.K=V
persisted.data.kafka.consumer.allow.auto.create.topics=false
persisted.data.kafka.consumer.auto.offset.reset=earliest
persisted.data.kafka.consumer.client.id=kafka-cruise-control.kafka-persisted-map.consumer
persisted.data.kafka.consumer.enable.auto.commit=false
persisted.data.kafka.consumer.exclude.internal.topics=false
persisted.data.kafka.consumer.group.id=kafka-cruise-control.kafka-persisted-map.consumer-group
persisted.data.kafka.consumer.isolation.level=read_committed
persisted.data.kafka.consumer.key.deserializer=StringDeserializer
persisted.data.kafka.consumer.max.poll.records=2147483647
persisted.data.kafka.consumer.reconnect.backoff.ms=1000
persisted.data.kafka.consumer.value.deserializer=StringDeserializer


# Configurations for anomaly detector
# =======================================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
import com.linkedin.kafka.cruisecontrol.persisteddata.PersistedMapFactory;
import com.linkedin.kafka.cruisecontrol.persisteddata.namespace.ExecutorPersistedData;
import com.linkedin.kafka.cruisecontrol.servlet.UserTaskManager;
import com.linkedin.kafka.cruisecontrol.servlet.response.stats.BrokerStats;
import java.io.InputStream;
Expand Down Expand Up @@ -82,6 +84,7 @@ public class KafkaCruiseControl {
private final GoalOptimizer _goalOptimizer;
private final ExecutorService _goalOptimizerExecutor;
private final Executor _executor;
private final PersistedMapFactory _persistedMapFactory;
private final AnomalyDetectorManager _anomalyDetectorManager;
private final Time _time;
private final AdminClient _adminClient;
Expand Down Expand Up @@ -122,7 +125,9 @@ public KafkaCruiseControl(KafkaCruiseControlConfig config, MetricRegistry dropwi
Provisioner.class,
Collections.singletonMap(KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, this));
_anomalyDetectorManager = new AnomalyDetectorManager(this, _time, dropwizardMetricRegistry);
_executor = new Executor(config, _time, dropwizardMetricRegistry, _anomalyDetectorManager);
_persistedMapFactory = new PersistedMapFactory(config, _adminClient);
_executor = new Executor(config, _time, dropwizardMetricRegistry, _anomalyDetectorManager,
new ExecutorPersistedData(_persistedMapFactory.instance()));
_loadMonitor = new LoadMonitor(config, _time, dropwizardMetricRegistry, KafkaMetricDef.commonMetricDef());
_goalOptimizerExecutor = Executors.newSingleThreadExecutor(new KafkaCruiseControlThreadFactory("GoalOptimizerExecutor"));
_goalOptimizer = new GoalOptimizer(config, _loadMonitor, _time, dropwizardMetricRegistry, _executor, _adminClient);
Expand All @@ -138,11 +143,13 @@ public KafkaCruiseControl(KafkaCruiseControlConfig config, MetricRegistry dropwi
LoadMonitor loadMonitor,
ExecutorService goalOptimizerExecutor,
GoalOptimizer goalOptimizer,
PersistedMapFactory persistedMapFactory,
Provisioner provisioner) {
_config = config;
_time = time;
_adminClient = createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(config));
_anomalyDetectorManager = anomalyDetectorManager;
_persistedMapFactory = persistedMapFactory;
_executor = executor;
_loadMonitor = loadMonitor;
_goalOptimizerExecutor = goalOptimizerExecutor;
Expand Down Expand Up @@ -468,9 +475,9 @@ public boolean dropRecentBrokers(Set<Integer> brokersToDrop, boolean isRemoved)
*/
public void addRecentBrokersPermanently(Set<Integer> brokersToAdd, boolean isRemoved) {
if (isRemoved) {
_executor.addRecentlyRemovedBrokers(brokersToAdd);
_executor.addRecentlyRemovedBrokersPermanently(brokersToAdd);
} else {
_executor.addRecentlyDemotedBrokers(brokersToAdd);
_executor.addRecentlyDemotedBrokersPermanently(brokersToAdd);
}
}

Expand Down
Loading