Skip to content

Commit

Permalink
Persist demoted and removed brokers
Browse files Browse the repository at this point in the history
* Simplified KafkaZkClient creation for debugging.
* Added persistence for demoted and removed brokers.
* Added tests and conformed to lint tests.
  • Loading branch information
morgangalpin committed Jan 29, 2024
1 parent 92c5b4a commit 5ae4d83
Show file tree
Hide file tree
Showing 30 changed files with 2,985 additions and 197 deletions.
46 changes: 46 additions & 0 deletions config/cruisecontrol.properties
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,52 @@ num.concurrent.leader.movements=1000
execution.progress.check.interval.ms=10000


# Configurations for persistent data
# =======================================

# The method to use to store persisted data. This is the first "persisted.data" config to set which
# will determine which other configs of the series should be configured. The available options are:
# "kafka",
# "memory"
# The default is "memory", which doesn't durably persist any runtime data.
#persisted.data.backing.method=kafka

# The name of the kafka topic to use to persist data when "persisted.data.backing.method" is set to
# "kafka". If the topic is not present, then it will be created.
#persisted.data.kafka.topic.name=__CruiseControlPersistentData

# The number of partitions to ensure are present for the kafka topic. Only applies when
# "persisted.data.backing.method" is set to "kafka". If the topic has fewer than this number of
# partitions, then partitions will be added.
#persisted.data.kafka.topic.partition.count=2

# The replication factor to use for the kafka topic. Only applies when
# "persisted.data.backing.method" is set to "kafka". Multiple partition replicas are desirable to
# ensure the topic is reasonably available.
#persisted.data.kafka.topic.replication.factor=2

# The configs to apply to the kafka topic used to persist Cruise Control data. Only applies if
# "persisted.data.backing.method" is set to "kafka". This "list" should be a semicolon separated
# string of 'key=value' pairs. The keys and values need to be valid Kafka Topic configs.
# See: https://kafka.apache.org/documentation/#topicconfigs
# e.g. persisted.data.kafka.topic.additional.configs.map=max.compaction.lag.ms=600000;min.insync.replicas=2
#persisted.data.kafka.topic.additional.configs.map=

# The additional configs to use when creating the kafka producer to persist Cruise Control data.
# Only applies if "persisted.data.backing.method" is set to "kafka". This "list" should be a
# semicolon separated string of 'key=value' pairs. The keys and values need to be valid Kafka
# Producer configs. See: https://kafka.apache.org/documentation/#producerconfigs
# e.g. persisted.data.kafka.producer.additional.configs.map=buffer.memory=100000000;batch.size=900000
#persisted.data.kafka.producer.additional.configs.map=

# The additional configs to use when creating the kafka consumer to read persisted Cruise Control
# data. Only applies if "persisted.data.backing.method" is set to "kafka". This "list" should be a
# semicolon separated string of 'key=value' pairs. The keys and values need to be valid Kafka
# Consumer configs. See: https://kafka.apache.org/documentation/#consumerconfigs
# e.g. persisted.data.kafka.consumer.additional.configs.map=group.id=my-group-id;fetch.max.bytes=2000000
#persisted.data.kafka.consumer.additional.configs.map=


# Configurations for anomaly detector
# =======================================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
import com.linkedin.kafka.cruisecontrol.persisteddata.PersistedMapFactory;
import com.linkedin.kafka.cruisecontrol.persisteddata.namespace.ExecutorPersistedData;
import com.linkedin.kafka.cruisecontrol.servlet.UserTaskManager;
import com.linkedin.kafka.cruisecontrol.servlet.response.stats.BrokerStats;
import java.io.InputStream;
Expand Down Expand Up @@ -82,6 +84,7 @@ public class KafkaCruiseControl {
private final GoalOptimizer _goalOptimizer;
private final ExecutorService _goalOptimizerExecutor;
private final Executor _executor;
private final PersistedMapFactory _persistedMapFactory;
private final AnomalyDetectorManager _anomalyDetectorManager;
private final Time _time;
private final AdminClient _adminClient;
Expand Down Expand Up @@ -122,7 +125,9 @@ public KafkaCruiseControl(KafkaCruiseControlConfig config, MetricRegistry dropwi
Provisioner.class,
Collections.singletonMap(KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, this));
_anomalyDetectorManager = new AnomalyDetectorManager(this, _time, dropwizardMetricRegistry);
_executor = new Executor(config, _time, dropwizardMetricRegistry, _anomalyDetectorManager);
_persistedMapFactory = new PersistedMapFactory(config, _adminClient);
_executor = new Executor(config, _time, dropwizardMetricRegistry, _anomalyDetectorManager,
new ExecutorPersistedData(_persistedMapFactory.instance()));
_loadMonitor = new LoadMonitor(config, _time, dropwizardMetricRegistry, KafkaMetricDef.commonMetricDef());
_goalOptimizerExecutor = Executors.newSingleThreadExecutor(new KafkaCruiseControlThreadFactory("GoalOptimizerExecutor"));
_goalOptimizer = new GoalOptimizer(config, _loadMonitor, _time, dropwizardMetricRegistry, _executor, _adminClient);
Expand All @@ -138,11 +143,13 @@ public KafkaCruiseControl(KafkaCruiseControlConfig config, MetricRegistry dropwi
LoadMonitor loadMonitor,
ExecutorService goalOptimizerExecutor,
GoalOptimizer goalOptimizer,
PersistedMapFactory persistedMapFactory,
Provisioner provisioner) {
_config = config;
_time = time;
_adminClient = createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(config));
_anomalyDetectorManager = anomalyDetectorManager;
_persistedMapFactory = persistedMapFactory;
_executor = executor;
_loadMonitor = loadMonitor;
_goalOptimizerExecutor = goalOptimizerExecutor;
Expand Down Expand Up @@ -468,9 +475,9 @@ public boolean dropRecentBrokers(Set<Integer> brokersToDrop, boolean isRemoved)
*/
public void addRecentBrokersPermanently(Set<Integer> brokersToAdd, boolean isRemoved) {
if (isRemoved) {
_executor.addRecentlyRemovedBrokers(brokersToAdd);
_executor.addRecentlyRemovedBrokersPermanently(brokersToAdd);
} else {
_executor.addRecentlyDemotedBrokers(brokersToAdd);
_executor.addRecentlyDemotedBrokersPermanently(brokersToAdd);
}
}

Expand Down
Loading

0 comments on commit 5ae4d83

Please sign in to comment.