Skip to content

Commit

Permalink
Add min.insync.replicas config for sample store and maintanence topics
Browse files Browse the repository at this point in the history
Signed-off-by: Kyle Liberti <[email protected]>
  • Loading branch information
kyguy committed Jan 3, 2024
1 parent fa21067 commit eb4a303
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,25 @@ public static Config describeClusterConfigs(AdminClient adminClient, Duration ti
* @param topic The name of the topic.
* @param partitionCount Desired partition count.
* @param replicationFactor Desired replication factor.
* @param minInSyncReplicas Desired min insync replicas count.
* @param retentionMs Desired retention in milliseconds.
* @return A wrapper around the topic with the given desired properties.
*/
public static NewTopic wrapTopic(String topic, int partitionCount, short replicationFactor, long retentionMs) {
if (partitionCount <= 0 || replicationFactor <= 0 || retentionMs <= 0) {
throw new IllegalArgumentException(String.format("Partition count (%d), replication factor (%d), and retention ms (%d)"
+ " must be positive for the topic (%s).", partitionCount,
replicationFactor, retentionMs, topic));
public static NewTopic wrapTopic(String topic, int partitionCount, short replicationFactor, short minInSyncReplicas, long retentionMs) {
if (partitionCount <= 0 || replicationFactor <= 0 || retentionMs <= 0 || minInSyncReplicas <= 0) {
throw new IllegalArgumentException(String.format("Partition count (%d), replication factor (%d), min insync replicas (%s),"
+ " and retention ms (%d) must be positive for the topic (%s).", partitionCount,
replicationFactor, minInSyncReplicas, retentionMs, topic));
}
if (replicationFactor < minInSyncReplicas) {
throw new IllegalArgumentException(String.format("Replication factor (%d) should be greater than or equal to"
+ " min insync replicas (%s) for the topic (%s).",
replicationFactor, minInSyncReplicas, topic));
}

NewTopic newTopic = new NewTopic(topic, partitionCount, replicationFactor);
Map<String, String> config = new HashMap<>();
config.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, Short.toString(minInSyncReplicas));
config.put(TopicConfig.RETENTION_MS_CONFIG, Long.toString(retentionMs));
config.put(TopicConfig.CLEANUP_POLICY_CONFIG, DEFAULT_CLEANUP_POLICY);
newTopic.configs(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
* from (default: {@link #DEFAULT_MAINTENANCE_EVENT_TOPIC}).</li>
* <li>{@link #MAINTENANCE_EVENT_TOPIC_REPLICATION_FACTOR_CONFIG}: The config for the replication factor of the maintenance
* event topic (default: min({@link #DEFAULT_MAINTENANCE_EVENT_TOPIC_REPLICATION_FACTOR}, broker-count-in-the-cluster)).</li>
* <li>{@link #MAINTENANCE_EVENT_TOPIC_MIN_IN_SYNC_REPLICAS_CONFIG}: The config for the min insync replicas count of the maintenance
* event topic (default: {@link #DEFAULT_MAINTENANCE_EVENT_TOPIC_MIN_IN_SYNC_REPLICAS}).</li>
* <li>{@link #MAINTENANCE_EVENT_TOPIC_PARTITION_COUNT_CONFIG}: The config for the partition count of the maintenance
* event topic (default: {@link #DEFAULT_MAINTENANCE_EVENT_TOPIC_PARTITION_COUNT}).</li>
* <li>{@link #MAINTENANCE_EVENT_TOPIC_RETENTION_MS_CONFIG}: The config for the retention of the maintenance event topic
Expand Down Expand Up @@ -81,6 +83,8 @@ public class MaintenanceEventTopicReader implements MaintenanceEventReader {
public static final String DEFAULT_MAINTENANCE_EVENT_TOPIC = "__MaintenanceEvent";
public static final String MAINTENANCE_EVENT_TOPIC_REPLICATION_FACTOR_CONFIG = "maintenance.event.topic.replication.factor";
public static final short DEFAULT_MAINTENANCE_EVENT_TOPIC_REPLICATION_FACTOR = 2;
public static final String MAINTENANCE_EVENT_TOPIC_MIN_IN_SYNC_REPLICAS_CONFIG = "maintenance.event.topic.min.insync.replicas";
public static final short DEFAULT_MAINTENANCE_EVENT_TOPIC_MIN_IN_SYNC_REPLICAS = 1;
public static final String MAINTENANCE_EVENT_TOPIC_PARTITION_COUNT_CONFIG = "maintenance.event.topic.partition.count";
public static final int DEFAULT_MAINTENANCE_EVENT_TOPIC_PARTITION_COUNT = 8;
public static final String MAINTENANCE_EVENT_TOPIC_RETENTION_MS_CONFIG = "maintenance.event.topic.retention.ms";
Expand Down Expand Up @@ -294,6 +298,12 @@ protected static short maintenanceEventTopicReplicationFactor(Map<String, ?> con
return Short.parseShort(maintenanceEventTopicRF);
}

protected static short maintenanceEventTopicMinInSyncReplicas(Map<String, ?> config) {
String maintenanceEventTopicMinIsr = (String) config.get(MAINTENANCE_EVENT_TOPIC_MIN_IN_SYNC_REPLICAS_CONFIG);
return maintenanceEventTopicMinIsr == null || maintenanceEventTopicMinIsr.isEmpty()
? DEFAULT_MAINTENANCE_EVENT_TOPIC_MIN_IN_SYNC_REPLICAS : Short.parseShort(maintenanceEventTopicMinIsr);
}

protected static long maintenanceEventTopicRetentionMs(Map<String, ?> config) {
String maintenanceEventTopicRetentionMs = (String) config.get(MAINTENANCE_EVENT_TOPIC_RETENTION_MS_CONFIG);
return maintenanceEventTopicRetentionMs == null || maintenanceEventTopicRetentionMs.isEmpty()
Expand All @@ -310,10 +320,11 @@ protected static int maintenanceEventTopicPartitionCount(Map<String, ?> config)
protected void ensureTopicCreated(Map<String, ?> config) {
AdminClient adminClient = _kafkaCruiseControl.adminClient();
short replicationFactor = maintenanceEventTopicReplicationFactor(config, adminClient);
short minInSyncReplicas = maintenanceEventTopicMinInSyncReplicas(config);
long retentionMs = maintenanceEventTopicRetentionMs(config);
int partitionCount = maintenanceEventTopicPartitionCount(config);

NewTopic maintenanceEventTopic = wrapTopic(_maintenanceEventTopic, partitionCount, replicationFactor, retentionMs);
NewTopic maintenanceEventTopic = wrapTopic(_maintenanceEventTopic, partitionCount, replicationFactor, minInSyncReplicas, retentionMs);
maybeCreateOrUpdateTopic(adminClient, maintenanceEventTopic);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
public abstract class AbstractKafkaSampleStore implements SampleStore {
protected static final Duration PRODUCER_CLOSE_TIMEOUT = Duration.ofMinutes(3);
protected static final short DEFAULT_SAMPLE_STORE_TOPIC_REPLICATION_FACTOR = 2;
protected static final short DEFAULT_SAMPLE_STORE_TOPIC_MIN_IN_SYNC_REPLICAS = 1;
protected static final int DEFAULT_PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT = 32;

protected volatile boolean _shutdown = false;
protected Short _sampleStoreTopicReplicationFactor;
protected Short _sampleStoreTopicMinInSyncReplicas;
protected Producer<byte[], byte[]> _producer;

protected void createProducer(Map<String, ?> config, String producerClientId) {
Expand Down Expand Up @@ -97,6 +99,18 @@ protected short sampleStoreTopicReplicationFactor(Map<String, ?> config, AdminCl
}
}

/**
* Retrieve the desired min insync replicas of sample store topics.
*
* @return Desired min insync replicas of sample store topics, or {@code null} if failed to resolve min insync replicas.
*/
protected short sampleStoreTopicMinInsyncReplicas() {
if (_sampleStoreTopicMinInSyncReplicas != null) {
return _sampleStoreTopicMinInSyncReplicas;
}
return DEFAULT_SAMPLE_STORE_TOPIC_MIN_IN_SYNC_REPLICAS;
}

protected void ensureTopicCreated(AdminClient adminClient, NewTopic sampleStoreTopic) {
if (!createTopic(adminClient, sampleStoreTopic)) {
// Update topic config and partition count to ensure desired properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ protected void ensureTopicCreated(Map<String, ?> config, int topicPartitionCount
AdminClient adminClient = KafkaCruiseControlUtils.createAdminClient((Map<String, Object>) config);
try {
short replicationFactor = sampleStoreTopicReplicationFactor(config, adminClient);
short minInSyncReplicas = sampleStoreTopicMinInsyncReplicas();

// New topics
NewTopic partitionSampleStoreNewTopic = wrapTopic(_partitionMetricSampleStoreTopic, topicPartitionCount,
replicationFactor, topicRetentionTimeMs);
replicationFactor, minInSyncReplicas, topicRetentionTimeMs);
ensureTopicCreated(adminClient, partitionSampleStoreNewTopic);
} finally {
KafkaCruiseControlUtils.closeAdminClientWithTimeout(adminClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class KafkaSampleStore extends AbstractKafkaSampleStore {
public static final String BROKER_METRIC_SAMPLE_STORE_TOPIC_CONFIG = "broker.metric.sample.store.topic";
public static final String NUM_SAMPLE_LOADING_THREADS_CONFIG = "num.sample.loading.threads";
public static final String SAMPLE_STORE_TOPIC_REPLICATION_FACTOR_CONFIG = "sample.store.topic.replication.factor";
public static final String SAMPLE_STORE_TOPIC_MIN_IN_SYNC_REPLICAS_CONFIG = "sample.store.topic.min.insync.replicas";
public static final String PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG = "partition.sample.store.topic.partition.count";
public static final String BROKER_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG = "broker.sample.store.topic.partition.count";
public static final String MIN_PARTITION_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS_CONFIG = "min.partition.sample.store.topic.retention.time.ms";
Expand All @@ -108,6 +109,11 @@ public void configure(Map<String, ?> config) {
_sampleStoreTopicReplicationFactor = metricSampleStoreTopicReplicationFactorString == null
|| metricSampleStoreTopicReplicationFactorString.isEmpty()
? null : Short.parseShort(metricSampleStoreTopicReplicationFactorString);
String metricSampleStoreMinInSyncReplicasString = (String) config.get(SAMPLE_STORE_TOPIC_MIN_IN_SYNC_REPLICAS_CONFIG);
_sampleStoreTopicMinInSyncReplicas = metricSampleStoreMinInSyncReplicasString == null
|| metricSampleStoreMinInSyncReplicasString.isEmpty()
? DEFAULT_SAMPLE_STORE_TOPIC_MIN_IN_SYNC_REPLICAS
: Short.parseShort(metricSampleStoreMinInSyncReplicasString);
String partitionSampleStoreTopicPartitionCountString = (String) config.get(PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG);
_partitionSampleStoreTopicPartitionCount = partitionSampleStoreTopicPartitionCountString == null
|| partitionSampleStoreTopicPartitionCountString.isEmpty()
Expand Down Expand Up @@ -149,6 +155,7 @@ protected void ensureTopicsCreated(Map<String, ?> config) {
AdminClient adminClient = KafkaCruiseControlUtils.createAdminClient((Map<String, Object>) config);
try {
short replicationFactor = sampleStoreTopicReplicationFactor(config, adminClient);
short minInSyncReplicas = sampleStoreTopicMinInsyncReplicas();

// Retention
long partitionSampleWindowMs = (Long) config.get(MonitorConfig.PARTITION_METRICS_WINDOW_MS_CONFIG);
Expand All @@ -164,9 +171,9 @@ protected void ensureTopicsCreated(Map<String, ?> config) {

// New topics
NewTopic partitionSampleStoreNewTopic = wrapTopic(_partitionMetricSampleStoreTopic, _partitionSampleStoreTopicPartitionCount,
replicationFactor, partitionSampleRetentionMs);
replicationFactor, minInSyncReplicas, partitionSampleRetentionMs);
NewTopic brokerSampleStoreNewTopic = wrapTopic(_brokerMetricSampleStoreTopic, _brokerSampleStoreTopicPartitionCount,
replicationFactor, brokerSampleRetentionMs);
replicationFactor, minInSyncReplicas, brokerSampleRetentionMs);

ensureTopicCreated(adminClient, partitionSampleStoreNewTopic);
ensureTopicCreated(adminClient, brokerSampleStoreNewTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@


public class KafkaCruiseControlUtilsTest {
private static final NewTopic TEST_TOPIC = wrapTopic("mock-topic", 10, (short) 3, TimeUnit.MINUTES.toMillis(10));
private static final NewTopic TEST_TOPIC = wrapTopic("mock-topic", 10, (short) 3, (short) 2, TimeUnit.MINUTES.toMillis(10));

@Test
public void testCreateTopic() throws ExecutionException, InterruptedException, TimeoutException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG;
import static com.linkedin.kafka.cruisecontrol.detector.MaintenanceEventTopicReader.DEFAULT_MAINTENANCE_PLAN_EXPIRATION_MS;
import static com.linkedin.kafka.cruisecontrol.detector.MaintenanceEventTopicReader.MAINTENANCE_EVENT_TOPIC_CONFIG;
import static com.linkedin.kafka.cruisecontrol.detector.MaintenanceEventTopicReader.MAINTENANCE_EVENT_TOPIC_MIN_IN_SYNC_REPLICAS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.detector.MaintenanceEventTopicReader.MAINTENANCE_EVENT_TOPIC_REPLICATION_FACTOR_CONFIG;
import static com.linkedin.kafka.cruisecontrol.detector.MaintenanceEventTopicReader.MAINTENANCE_EVENT_TOPIC_PARTITION_COUNT_CONFIG;
import static com.linkedin.kafka.cruisecontrol.detector.MaintenanceEventTopicReader.MAINTENANCE_EVENT_TOPIC_RETENTION_MS_CONFIG;
Expand All @@ -52,6 +53,7 @@
public class MaintenanceEventTopicReaderTest extends CruiseControlIntegrationTestHarness {
private static final String TEST_TOPIC = "__CloudMaintenanceEvent";
private static final String TEST_TOPIC_REPLICATION_FACTOR = "1";
private static final String TEST_TOPIC_MIN_INSYNC_REPLICAS = "1";
private static final String TEST_TOPIC_PARTITION_COUNT = "8";
private static final String TEST_TOPIC_RETENTION_TIME_MS = Long.toString(TimeUnit.HOURS.toMillis(1));
private static final String RETENTION_MS_CONFIG = "retention.ms";
Expand Down Expand Up @@ -134,6 +136,7 @@ public void teardown() {
protected Map<String, Object> withConfigs() {
return Map.of(MAINTENANCE_EVENT_TOPIC_CONFIG, TEST_TOPIC,
MAINTENANCE_EVENT_TOPIC_REPLICATION_FACTOR_CONFIG, TEST_TOPIC_REPLICATION_FACTOR,
MAINTENANCE_EVENT_TOPIC_MIN_IN_SYNC_REPLICAS_CONFIG, TEST_TOPIC_MIN_INSYNC_REPLICAS,
MAINTENANCE_EVENT_TOPIC_PARTITION_COUNT_CONFIG, TEST_TOPIC_PARTITION_COUNT,
MAINTENANCE_EVENT_TOPIC_RETENTION_MS_CONFIG, TEST_TOPIC_RETENTION_TIME_MS,
AnomalyDetectorConfig.MAINTENANCE_EVENT_READER_CLASS_CONFIG, MaintenanceEventTopicReader.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.maybeUpdateTopicConfig;
import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.wrapTopic;
import static kafka.log.LogConfig.CleanupPolicyProp;
import static kafka.log.LogConfig.MinInSyncReplicasProp;
import static kafka.log.LogConfig.RetentionMsProp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand All @@ -48,6 +49,7 @@ public class SamplingUtilsTest {
private static final int MOCK_PARTITION_COUNT = 1;
private static final int MOCK_DESIRED_PARTITION_COUNT = 2;
private static final short MOCK_REPLICATION_FACTOR = 3;
private static final short MOCK_MIN_IN_SYNC_REPLICAS = 2;
private static final long MOCK_DESIRED_RETENTION_MS = TimeUnit.SECONDS.toMillis(10);
private static final String MOCK_CURRENT_RETENTION_MS = "100";
private static final ConfigResource MOCK_TOPIC_RESOURCE = new ConfigResource(ConfigResource.Type.TOPIC, MOCK_TOPIC);
Expand All @@ -66,12 +68,15 @@ public void testMaybeUpdateTopicConfig() throws InterruptedException, ExecutionE
Map<ConfigResource, KafkaFuture<Void>> alterConfigsValues = Collections.singletonMap(MOCK_TOPIC_RESOURCE,
EasyMock.createMock(KafkaFuture.class));

NewTopic topicToUpdateConfigs = wrapTopic(MOCK_TOPIC, MOCK_PARTITION_COUNT, MOCK_REPLICATION_FACTOR, MOCK_DESIRED_RETENTION_MS);
NewTopic topicToUpdateConfigs = wrapTopic(MOCK_TOPIC, MOCK_PARTITION_COUNT, MOCK_REPLICATION_FACTOR, MOCK_MIN_IN_SYNC_REPLICAS,
MOCK_DESIRED_RETENTION_MS);
EasyMock.expect(adminClient.describeConfigs(EasyMock.eq(Collections.singleton(MOCK_TOPIC_RESOURCE)))).andReturn(describeConfigsResult);
EasyMock.expect(describeConfigsResult.values()).andReturn(describeConfigsValues);
EasyMock.expect(describedConfigsFuture.get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)).andReturn(topicConfig);
EasyMock.expect(topicConfig.get(EasyMock.eq(CleanupPolicyProp()))).andReturn(new ConfigEntry(CleanupPolicyProp(),
DEFAULT_CLEANUP_POLICY));
EasyMock.expect(topicConfig.get(EasyMock.eq(MinInSyncReplicasProp()))).andReturn(new ConfigEntry(MinInSyncReplicasProp(),
Short.toString(MOCK_MIN_IN_SYNC_REPLICAS)));
EasyMock.expect(topicConfig.get(EasyMock.eq(RetentionMsProp()))).andReturn(new ConfigEntry(RetentionMsProp(),
MOCK_CURRENT_RETENTION_MS));
EasyMock.expect(adminClient.incrementalAlterConfigs(EasyMock.eq(Collections.singletonMap(MOCK_TOPIC_RESOURCE,
Expand All @@ -88,7 +93,8 @@ public void testMaybeUpdateTopicConfig() throws InterruptedException, ExecutionE
@Test
public void testMaybeIncreasePartitionCount() throws InterruptedException, ExecutionException, TimeoutException {
AdminClient adminClient = EasyMock.createMock(AdminClient.class);
NewTopic topicToAddPartitions = wrapTopic(MOCK_TOPIC, MOCK_DESIRED_PARTITION_COUNT, MOCK_REPLICATION_FACTOR, MOCK_DESIRED_RETENTION_MS);
NewTopic topicToAddPartitions = wrapTopic(MOCK_TOPIC, MOCK_DESIRED_PARTITION_COUNT, MOCK_REPLICATION_FACTOR, MOCK_MIN_IN_SYNC_REPLICAS,
MOCK_DESIRED_RETENTION_MS);
DescribeTopicsResult describeTopicsResult = EasyMock.createMock(DescribeTopicsResult.class);
KafkaFuture<TopicDescription> topicDescriptionFuture = EasyMock.createMock(KafkaFuture.class);
TopicDescription topicDescription = EasyMock.createMock(TopicDescription.class);
Expand Down
Loading

0 comments on commit eb4a303

Please sign in to comment.