From d021c7f5f862c7b479ff702c7869f6ec230aa8a7 Mon Sep 17 00:00:00 2001 From: Kyle Liberti Date: Wed, 3 Jan 2024 17:11:04 -0500 Subject: [PATCH] Add min.insync.replicas config for sample store and maintanence topics Signed-off-by: Kyle Liberti --- .../KafkaCruiseControlUtils.java | 17 +++++++--- .../detector/MaintenanceEventTopicReader.java | 13 ++++++- .../sampling/AbstractKafkaSampleStore.java | 14 ++++++++ ...PartitionMetricSampleOnExecutionStore.java | 3 +- .../monitor/sampling/KafkaSampleStore.java | 11 ++++-- .../KafkaCruiseControlUtilsTest.java | 2 +- .../MaintenanceEventTopicReaderTest.java | 3 ++ .../monitor/sampling/SamplingUtilsTest.java | 10 ++++-- docs/wiki/User Guide/Configurations.md | 34 ++++++++++--------- 9 files changed, 79 insertions(+), 28 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java index 50d6f53972..9d36b04fa3 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java @@ -231,18 +231,25 @@ public static Config describeClusterConfigs(AdminClient adminClient, Duration ti * @param topic The name of the topic. * @param partitionCount Desired partition count. * @param replicationFactor Desired replication factor. + * @param minInSyncReplicas Desired min insync replicas count. * @param retentionMs Desired retention in milliseconds. * @return A wrapper around the topic with the given desired properties. */ - public static NewTopic wrapTopic(String topic, int partitionCount, short replicationFactor, long retentionMs) { - if (partitionCount <= 0 || replicationFactor <= 0 || retentionMs <= 0) { - throw new IllegalArgumentException(String.format("Partition count (%d), replication factor (%d), and retention ms (%d)" - + " must be positive for the topic (%s).", partitionCount, - replicationFactor, retentionMs, topic)); + public static NewTopic wrapTopic(String topic, int partitionCount, short replicationFactor, short minInSyncReplicas, long retentionMs) { + if (partitionCount <= 0 || replicationFactor <= 0 || retentionMs <= 0 || minInSyncReplicas <= 0) { + throw new IllegalArgumentException(String.format("Partition count (%d), replication factor (%d), min insync replicas (%s)," + + " and retention ms (%d) must be positive for the topic (%s).", partitionCount, + replicationFactor, minInSyncReplicas, retentionMs, topic)); + } + if (replicationFactor < minInSyncReplicas) { + throw new IllegalArgumentException(String.format("Replication factor (%d) should be greater than or equal to" + + " min insync replicas (%s) for the topic (%s).", + replicationFactor, minInSyncReplicas, topic)); } NewTopic newTopic = new NewTopic(topic, partitionCount, replicationFactor); Map config = new HashMap<>(); + config.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, Short.toString(minInSyncReplicas)); config.put(TopicConfig.RETENTION_MS_CONFIG, Long.toString(retentionMs)); config.put(TopicConfig.CLEANUP_POLICY_CONFIG, DEFAULT_CLEANUP_POLICY); newTopic.configs(config); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTopicReader.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTopicReader.java index c9bc2cd2bb..beb5bb9c5a 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTopicReader.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTopicReader.java @@ -54,6 +54,8 @@ * from (default: {@link #DEFAULT_MAINTENANCE_EVENT_TOPIC}). *
  • {@link #MAINTENANCE_EVENT_TOPIC_REPLICATION_FACTOR_CONFIG}: The config for the replication factor of the maintenance * event topic (default: min({@link #DEFAULT_MAINTENANCE_EVENT_TOPIC_REPLICATION_FACTOR}, broker-count-in-the-cluster)).
  • + *
  • {@link #MAINTENANCE_EVENT_TOPIC_MIN_IN_SYNC_REPLICAS_CONFIG}: The config for the min insync replicas count of the maintenance + * event topic (default: {@link #DEFAULT_MAINTENANCE_EVENT_TOPIC_MIN_IN_SYNC_REPLICAS}).
  • *
  • {@link #MAINTENANCE_EVENT_TOPIC_PARTITION_COUNT_CONFIG}: The config for the partition count of the maintenance * event topic (default: {@link #DEFAULT_MAINTENANCE_EVENT_TOPIC_PARTITION_COUNT}).
  • *
  • {@link #MAINTENANCE_EVENT_TOPIC_RETENTION_MS_CONFIG}: The config for the retention of the maintenance event topic @@ -81,6 +83,8 @@ public class MaintenanceEventTopicReader implements MaintenanceEventReader { public static final String DEFAULT_MAINTENANCE_EVENT_TOPIC = "__MaintenanceEvent"; public static final String MAINTENANCE_EVENT_TOPIC_REPLICATION_FACTOR_CONFIG = "maintenance.event.topic.replication.factor"; public static final short DEFAULT_MAINTENANCE_EVENT_TOPIC_REPLICATION_FACTOR = 2; + public static final String MAINTENANCE_EVENT_TOPIC_MIN_IN_SYNC_REPLICAS_CONFIG = "maintenance.event.topic.min.insync.replicas"; + public static final short DEFAULT_MAINTENANCE_EVENT_TOPIC_MIN_IN_SYNC_REPLICAS = 1; public static final String MAINTENANCE_EVENT_TOPIC_PARTITION_COUNT_CONFIG = "maintenance.event.topic.partition.count"; public static final int DEFAULT_MAINTENANCE_EVENT_TOPIC_PARTITION_COUNT = 8; public static final String MAINTENANCE_EVENT_TOPIC_RETENTION_MS_CONFIG = "maintenance.event.topic.retention.ms"; @@ -294,6 +298,12 @@ protected static short maintenanceEventTopicReplicationFactor(Map con return Short.parseShort(maintenanceEventTopicRF); } + protected static short maintenanceEventTopicMinInSyncReplicas(Map config) { + String maintenanceEventTopicMinIsr = (String) config.get(MAINTENANCE_EVENT_TOPIC_MIN_IN_SYNC_REPLICAS_CONFIG); + return maintenanceEventTopicMinIsr == null || maintenanceEventTopicMinIsr.isEmpty() + ? DEFAULT_MAINTENANCE_EVENT_TOPIC_MIN_IN_SYNC_REPLICAS : Short.parseShort(maintenanceEventTopicMinIsr); + } + protected static long maintenanceEventTopicRetentionMs(Map config) { String maintenanceEventTopicRetentionMs = (String) config.get(MAINTENANCE_EVENT_TOPIC_RETENTION_MS_CONFIG); return maintenanceEventTopicRetentionMs == null || maintenanceEventTopicRetentionMs.isEmpty() @@ -310,10 +320,11 @@ protected static int maintenanceEventTopicPartitionCount(Map config) protected void ensureTopicCreated(Map config) { AdminClient adminClient = _kafkaCruiseControl.adminClient(); short replicationFactor = maintenanceEventTopicReplicationFactor(config, adminClient); + short minInSyncReplicas = maintenanceEventTopicMinInSyncReplicas(config); long retentionMs = maintenanceEventTopicRetentionMs(config); int partitionCount = maintenanceEventTopicPartitionCount(config); - NewTopic maintenanceEventTopic = wrapTopic(_maintenanceEventTopic, partitionCount, replicationFactor, retentionMs); + NewTopic maintenanceEventTopic = wrapTopic(_maintenanceEventTopic, partitionCount, replicationFactor, minInSyncReplicas, retentionMs); maybeCreateOrUpdateTopic(adminClient, maintenanceEventTopic); } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/AbstractKafkaSampleStore.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/AbstractKafkaSampleStore.java index fbac5c4ffe..21ee5ffe83 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/AbstractKafkaSampleStore.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/AbstractKafkaSampleStore.java @@ -32,10 +32,12 @@ public abstract class AbstractKafkaSampleStore implements SampleStore { protected static final Duration PRODUCER_CLOSE_TIMEOUT = Duration.ofMinutes(3); protected static final short DEFAULT_SAMPLE_STORE_TOPIC_REPLICATION_FACTOR = 2; + protected static final short DEFAULT_SAMPLE_STORE_TOPIC_MIN_IN_SYNC_REPLICAS = 1; protected static final int DEFAULT_PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT = 32; protected volatile boolean _shutdown = false; protected Short _sampleStoreTopicReplicationFactor; + protected Short _sampleStoreTopicMinInSyncReplicas; protected Producer _producer; protected void createProducer(Map config, String producerClientId) { @@ -97,6 +99,18 @@ protected short sampleStoreTopicReplicationFactor(Map config, AdminCl } } + /** + * Retrieve the desired min insync replicas of sample store topics. + * + * @return Desired min insync replicas of sample store topics, or {@code null} if failed to resolve min insync replicas. + */ + protected short sampleStoreTopicMinInsyncReplicas() { + if (_sampleStoreTopicMinInSyncReplicas != null) { + return _sampleStoreTopicMinInSyncReplicas; + } + return DEFAULT_SAMPLE_STORE_TOPIC_MIN_IN_SYNC_REPLICAS; + } + protected void ensureTopicCreated(AdminClient adminClient, NewTopic sampleStoreTopic) { if (!createTopic(adminClient, sampleStoreTopic)) { // Update topic config and partition count to ensure desired properties. diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaPartitionMetricSampleOnExecutionStore.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaPartitionMetricSampleOnExecutionStore.java index c56572e181..b9c2d13fea 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaPartitionMetricSampleOnExecutionStore.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaPartitionMetricSampleOnExecutionStore.java @@ -76,10 +76,11 @@ protected void ensureTopicCreated(Map config, int topicPartitionCount AdminClient adminClient = KafkaCruiseControlUtils.createAdminClient((Map) config); try { short replicationFactor = sampleStoreTopicReplicationFactor(config, adminClient); + short minInSyncReplicas = sampleStoreTopicMinInsyncReplicas(); // New topics NewTopic partitionSampleStoreNewTopic = wrapTopic(_partitionMetricSampleStoreTopic, topicPartitionCount, - replicationFactor, topicRetentionTimeMs); + replicationFactor, minInSyncReplicas, topicRetentionTimeMs); ensureTopicCreated(adminClient, partitionSampleStoreNewTopic); } finally { KafkaCruiseControlUtils.closeAdminClientWithTimeout(adminClient); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaSampleStore.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaSampleStore.java index 1b70218d0e..7f4b7ba3a2 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaSampleStore.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaSampleStore.java @@ -95,6 +95,7 @@ public class KafkaSampleStore extends AbstractKafkaSampleStore { public static final String BROKER_METRIC_SAMPLE_STORE_TOPIC_CONFIG = "broker.metric.sample.store.topic"; public static final String NUM_SAMPLE_LOADING_THREADS_CONFIG = "num.sample.loading.threads"; public static final String SAMPLE_STORE_TOPIC_REPLICATION_FACTOR_CONFIG = "sample.store.topic.replication.factor"; + public static final String SAMPLE_STORE_TOPIC_MIN_IN_SYNC_REPLICAS_CONFIG = "sample.store.topic.min.insync.replicas"; public static final String PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG = "partition.sample.store.topic.partition.count"; public static final String BROKER_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG = "broker.sample.store.topic.partition.count"; public static final String MIN_PARTITION_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS_CONFIG = "min.partition.sample.store.topic.retention.time.ms"; @@ -108,6 +109,11 @@ public void configure(Map config) { _sampleStoreTopicReplicationFactor = metricSampleStoreTopicReplicationFactorString == null || metricSampleStoreTopicReplicationFactorString.isEmpty() ? null : Short.parseShort(metricSampleStoreTopicReplicationFactorString); + String metricSampleStoreMinInSyncReplicasString = (String) config.get(SAMPLE_STORE_TOPIC_MIN_IN_SYNC_REPLICAS_CONFIG); + _sampleStoreTopicMinInSyncReplicas = metricSampleStoreMinInSyncReplicasString == null + || metricSampleStoreMinInSyncReplicasString.isEmpty() + ? DEFAULT_SAMPLE_STORE_TOPIC_MIN_IN_SYNC_REPLICAS + : Short.parseShort(metricSampleStoreMinInSyncReplicasString); String partitionSampleStoreTopicPartitionCountString = (String) config.get(PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG); _partitionSampleStoreTopicPartitionCount = partitionSampleStoreTopicPartitionCountString == null || partitionSampleStoreTopicPartitionCountString.isEmpty() @@ -149,6 +155,7 @@ protected void ensureTopicsCreated(Map config) { AdminClient adminClient = KafkaCruiseControlUtils.createAdminClient((Map) config); try { short replicationFactor = sampleStoreTopicReplicationFactor(config, adminClient); + short minInSyncReplicas = sampleStoreTopicMinInsyncReplicas(); // Retention long partitionSampleWindowMs = (Long) config.get(MonitorConfig.PARTITION_METRICS_WINDOW_MS_CONFIG); @@ -164,9 +171,9 @@ protected void ensureTopicsCreated(Map config) { // New topics NewTopic partitionSampleStoreNewTopic = wrapTopic(_partitionMetricSampleStoreTopic, _partitionSampleStoreTopicPartitionCount, - replicationFactor, partitionSampleRetentionMs); + replicationFactor, minInSyncReplicas, partitionSampleRetentionMs); NewTopic brokerSampleStoreNewTopic = wrapTopic(_brokerMetricSampleStoreTopic, _brokerSampleStoreTopicPartitionCount, - replicationFactor, brokerSampleRetentionMs); + replicationFactor, minInSyncReplicas, brokerSampleRetentionMs); ensureTopicCreated(adminClient, partitionSampleStoreNewTopic); ensureTopicCreated(adminClient, brokerSampleStoreNewTopic); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtilsTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtilsTest.java index baa92e7ac2..2f22fe9675 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtilsTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtilsTest.java @@ -25,7 +25,7 @@ public class KafkaCruiseControlUtilsTest { - private static final NewTopic TEST_TOPIC = wrapTopic("mock-topic", 10, (short) 3, TimeUnit.MINUTES.toMillis(10)); + private static final NewTopic TEST_TOPIC = wrapTopic("mock-topic", 10, (short) 3, (short) 2, TimeUnit.MINUTES.toMillis(10)); @Test public void testCreateTopic() throws ExecutionException, InterruptedException, TimeoutException { diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTopicReaderTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTopicReaderTest.java index e12d0655ee..50f275c8b6 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTopicReaderTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTopicReaderTest.java @@ -39,6 +39,7 @@ import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG; import static com.linkedin.kafka.cruisecontrol.detector.MaintenanceEventTopicReader.DEFAULT_MAINTENANCE_PLAN_EXPIRATION_MS; import static com.linkedin.kafka.cruisecontrol.detector.MaintenanceEventTopicReader.MAINTENANCE_EVENT_TOPIC_CONFIG; +import static com.linkedin.kafka.cruisecontrol.detector.MaintenanceEventTopicReader.MAINTENANCE_EVENT_TOPIC_MIN_IN_SYNC_REPLICAS_CONFIG; import static com.linkedin.kafka.cruisecontrol.detector.MaintenanceEventTopicReader.MAINTENANCE_EVENT_TOPIC_REPLICATION_FACTOR_CONFIG; import static com.linkedin.kafka.cruisecontrol.detector.MaintenanceEventTopicReader.MAINTENANCE_EVENT_TOPIC_PARTITION_COUNT_CONFIG; import static com.linkedin.kafka.cruisecontrol.detector.MaintenanceEventTopicReader.MAINTENANCE_EVENT_TOPIC_RETENTION_MS_CONFIG; @@ -52,6 +53,7 @@ public class MaintenanceEventTopicReaderTest extends CruiseControlIntegrationTestHarness { private static final String TEST_TOPIC = "__CloudMaintenanceEvent"; private static final String TEST_TOPIC_REPLICATION_FACTOR = "1"; + private static final String TEST_TOPIC_MIN_INSYNC_REPLICAS = "1"; private static final String TEST_TOPIC_PARTITION_COUNT = "8"; private static final String TEST_TOPIC_RETENTION_TIME_MS = Long.toString(TimeUnit.HOURS.toMillis(1)); private static final String RETENTION_MS_CONFIG = "retention.ms"; @@ -134,6 +136,7 @@ public void teardown() { protected Map withConfigs() { return Map.of(MAINTENANCE_EVENT_TOPIC_CONFIG, TEST_TOPIC, MAINTENANCE_EVENT_TOPIC_REPLICATION_FACTOR_CONFIG, TEST_TOPIC_REPLICATION_FACTOR, + MAINTENANCE_EVENT_TOPIC_MIN_IN_SYNC_REPLICAS_CONFIG, TEST_TOPIC_MIN_INSYNC_REPLICAS, MAINTENANCE_EVENT_TOPIC_PARTITION_COUNT_CONFIG, TEST_TOPIC_PARTITION_COUNT, MAINTENANCE_EVENT_TOPIC_RETENTION_MS_CONFIG, TEST_TOPIC_RETENTION_TIME_MS, AnomalyDetectorConfig.MAINTENANCE_EVENT_READER_CLASS_CONFIG, MaintenanceEventTopicReader.class.getName()); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/SamplingUtilsTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/SamplingUtilsTest.java index 25498d2d1e..9b55d33939 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/SamplingUtilsTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/SamplingUtilsTest.java @@ -35,6 +35,7 @@ import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.maybeUpdateTopicConfig; import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.wrapTopic; import static kafka.log.LogConfig.CleanupPolicyProp; +import static kafka.log.LogConfig.MinInSyncReplicasProp; import static kafka.log.LogConfig.RetentionMsProp; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -48,6 +49,7 @@ public class SamplingUtilsTest { private static final int MOCK_PARTITION_COUNT = 1; private static final int MOCK_DESIRED_PARTITION_COUNT = 2; private static final short MOCK_REPLICATION_FACTOR = 3; + private static final short MOCK_MIN_IN_SYNC_REPLICAS = 2; private static final long MOCK_DESIRED_RETENTION_MS = TimeUnit.SECONDS.toMillis(10); private static final String MOCK_CURRENT_RETENTION_MS = "100"; private static final ConfigResource MOCK_TOPIC_RESOURCE = new ConfigResource(ConfigResource.Type.TOPIC, MOCK_TOPIC); @@ -66,12 +68,15 @@ public void testMaybeUpdateTopicConfig() throws InterruptedException, ExecutionE Map> alterConfigsValues = Collections.singletonMap(MOCK_TOPIC_RESOURCE, EasyMock.createMock(KafkaFuture.class)); - NewTopic topicToUpdateConfigs = wrapTopic(MOCK_TOPIC, MOCK_PARTITION_COUNT, MOCK_REPLICATION_FACTOR, MOCK_DESIRED_RETENTION_MS); + NewTopic topicToUpdateConfigs = wrapTopic(MOCK_TOPIC, MOCK_PARTITION_COUNT, MOCK_REPLICATION_FACTOR, MOCK_MIN_IN_SYNC_REPLICAS, + MOCK_DESIRED_RETENTION_MS); EasyMock.expect(adminClient.describeConfigs(EasyMock.eq(Collections.singleton(MOCK_TOPIC_RESOURCE)))).andReturn(describeConfigsResult); EasyMock.expect(describeConfigsResult.values()).andReturn(describeConfigsValues); EasyMock.expect(describedConfigsFuture.get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)).andReturn(topicConfig); EasyMock.expect(topicConfig.get(EasyMock.eq(CleanupPolicyProp()))).andReturn(new ConfigEntry(CleanupPolicyProp(), DEFAULT_CLEANUP_POLICY)); + EasyMock.expect(topicConfig.get(EasyMock.eq(MinInSyncReplicasProp()))).andReturn(new ConfigEntry(MinInSyncReplicasProp(), + Short.toString(MOCK_MIN_IN_SYNC_REPLICAS))); EasyMock.expect(topicConfig.get(EasyMock.eq(RetentionMsProp()))).andReturn(new ConfigEntry(RetentionMsProp(), MOCK_CURRENT_RETENTION_MS)); EasyMock.expect(adminClient.incrementalAlterConfigs(EasyMock.eq(Collections.singletonMap(MOCK_TOPIC_RESOURCE, @@ -88,7 +93,8 @@ public void testMaybeUpdateTopicConfig() throws InterruptedException, ExecutionE @Test public void testMaybeIncreasePartitionCount() throws InterruptedException, ExecutionException, TimeoutException { AdminClient adminClient = EasyMock.createMock(AdminClient.class); - NewTopic topicToAddPartitions = wrapTopic(MOCK_TOPIC, MOCK_DESIRED_PARTITION_COUNT, MOCK_REPLICATION_FACTOR, MOCK_DESIRED_RETENTION_MS); + NewTopic topicToAddPartitions = wrapTopic(MOCK_TOPIC, MOCK_DESIRED_PARTITION_COUNT, MOCK_REPLICATION_FACTOR, MOCK_MIN_IN_SYNC_REPLICAS, + MOCK_DESIRED_RETENTION_MS); DescribeTopicsResult describeTopicsResult = EasyMock.createMock(DescribeTopicsResult.class); KafkaFuture topicDescriptionFuture = EasyMock.createMock(KafkaFuture.class); TopicDescription topicDescription = EasyMock.createMock(TopicDescription.class); diff --git a/docs/wiki/User Guide/Configurations.md b/docs/wiki/User Guide/Configurations.md index 5100b9cca3..a50f075c27 100644 --- a/docs/wiki/User Guide/Configurations.md +++ b/docs/wiki/User Guide/Configurations.md @@ -300,16 +300,17 @@ We are still trying to improve cruise control. And following are some configurat | prometheus.query.supplier | Class | N | com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.DefaultPrometheusQuerySupplier | The class that supplies the Prometheus queries corresponding to Kafka raw metrics. If there are no customizations done when configuring Prometheus node exporter, the default class should work fine. | | prometheus.broker.metrics.scraping.frequency.seconds | Integer | N | 60 | The scraping frequency with which Prometheus scrapes metrics from Kafka brokers. This value is used by DefaultPrometheusQuerySupplier to construct the iRate query that is used to get broker cpu metrics. | ### KafkaSampleStore configurations -| Name | Type | Required? | Default Value | Description | -|-------------------------------------------------------|---------|-----------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| partition.metric.sample.store.topic | String | Y | | The topic in which Cruise Control will store its processed metric samples as a backup. When Cruise Control is rebooted, it will load the metrics from this topic to populate the load monitor. | -| broker.metric.sample.store.topic | String | Y | | The topic in which Cruise Control will store its broker metric samples as a backup. When Cruise Control is rebooted, it will load the broker metric samples from this topic to train its cluster model. | -| num.sample.loading.threads | Integer | N | 8 | The number of threads to load from the sample store topics | -| sample.store.topic.replication.factor | Integer | N | 2 | The config for the replication factor of Kafka sample store topics | -| partition.sample.store.topic.partition.count | Integer | N | 32 | The config for the number of partition for Kafka partition sample store topic | -| broker.sample.store.topic.partition.count | Integer | N | 32 | The config for the number of partition for Kafka broker sample store topic | -| min.partition.sample.store.topic.retention.time.ms | Integer | N | 3600000 | The config for the minimal retention time for Kafka partition sample store topic | -| min.broker.sample.store.topic.retention.time.ms | Integer | N | 3600000 | The config for the minimal retention time for Kafka broker sample store topic | +| Name | Type | Required? | Default Value | Description | +|----------------------------------------------------|---------|-----------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| partition.metric.sample.store.topic | String | Y | | The topic in which Cruise Control will store its processed metric samples as a backup. When Cruise Control is rebooted, it will load the metrics from this topic to populate the load monitor. | +| broker.metric.sample.store.topic | String | Y | | The topic in which Cruise Control will store its broker metric samples as a backup. When Cruise Control is rebooted, it will load the broker metric samples from this topic to train its cluster model. | +| num.sample.loading.threads | Integer | N | 8 | The number of threads to load from the sample store topics | +| sample.store.topic.replication.factor | Integer | N | 2 | The config for the replication factor of Kafka sample store topics | +| sample.store.topic.min.insync.replicas | Integer | N | 1 | The config for the min insync replicas of Kafka sample store topics | +| partition.sample.store.topic.partition.count | Integer | N | 32 | The config for the number of partition for Kafka partition sample store topic | +| broker.sample.store.topic.partition.count | Integer | N | 32 | The config for the number of partition for Kafka broker sample store topic | +| min.partition.sample.store.topic.retention.time.ms | Integer | N | 3600000 | The config for the minimal retention time for Kafka partition sample store topic | +| min.broker.sample.store.topic.retention.time.ms | Integer | N | 3600000 | The config for the minimal retention time for Kafka broker sample store topic | | ### KafkaPartitionMetricSampleOnExecutionStore configurations @@ -321,12 +322,13 @@ We are still trying to improve cruise control. And following are some configurat | partition.metric.sample.store.on.execution.topic.retention.time.ms | Integer | N | 3600000 | The config for the retention time for Kafka partition metrics sample store during ongoing execution topic. | ### MaintenanceEventTopicReader configurations -| Name | Type | Required? | Default Value | Description | -|-----------------------------------------------|---------|-----------|-------------------------|-------------------------------------------------------------------| -| maintenance.event.topic | String | N | __MaintenanceEvent | The name of the Kafka topic to consume maintenance events from | -| maintenance.event.topic.replication.factor | Short | N | min(2, #alive-brokers) | The replication factor of the maintenance event topic | -| maintenance.event.topic.partition.count | Integer | N | 8 | The partition count of the maintenance event topic | -| maintenance.event.topic.retention.ms | Long | N | 21600000 | The retention of the maintenance event topic | +| Name | Type | Required? | Default Value | Description | +|---------------------------------------------|---------|-----------|------------------------|----------------------------------------------------------------------| +| maintenance.event.topic | String | N | __MaintenanceEvent | The name of the Kafka topic to consume maintenance events from | +| maintenance.event.topic.replication.factor | Short | N | min(2, #alive-brokers) | The replication factor of the maintenance event topic | +| maintenance.event.topic.min.insync.replicas | Short | N | 1 | The minimum number of insync replicas of the maintenance event topic | +| maintenance.event.topic.partition.count | Integer | N | 8 | The partition count of the maintenance event topic | +| maintenance.event.topic.retention.ms | Long | N | 21600000 | The retention of the maintenance event topic | ### BrokerCapacityConfigurationFileResolver configurations | Name | Type | Required? | Default Value | Description |