Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for for reconfiguration of ssl properties for metrics reporter #2206

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,66 +142,91 @@ static String getBootstrapServers(Map<String, ?> configs) {

@Override
public void configure(Map<String, ?> configs) {
CruiseControlMetricsReporterConfig reporterConfig = new CruiseControlMetricsReporterConfig(configs, false);

_metricsReporterCreateRetries = reporterConfig.getInt(
CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_CREATE_RETRIES_CONFIG);

Properties producerProps = buildProducerProperties(configs, reporterConfig);
createCruiseControlMetricsProducer(producerProps);
if (_producer == null) {
this.close();
}

_brokerId = Integer.parseInt((String) configs.get("broker.id"));

_cruiseControlMetricsTopic = reporterConfig.getString(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG);
_reportingIntervalMs = reporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG);
_kubernetesMode = reporterConfig.getBoolean(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_KUBERNETES_MODE_CONFIG);

if (reporterConfig.getBoolean(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG)) {
try {
_metricsTopic = createMetricsTopicFromReporterConfig(reporterConfig);
Properties adminClientConfigs = CruiseControlMetricsUtils.addSslConfigs(producerProps, reporterConfig);
_adminClient = CruiseControlMetricsUtils.createAdminClient(adminClientConfigs);
_metricsTopicAutoCreateTimeoutMs = reporterConfig.getLong(
CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_TIMEOUT_MS_CONFIG);
_metricsTopicAutoCreateRetries = reporterConfig.getInt(
CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_RETRIES_CONFIG);
} catch (CruiseControlMetricsReporterException e) {
LOG.warn("Cruise Control metrics topic auto creation was disabled", e);
}
}
}

private Properties buildProducerProperties(Map<String, ?> configs, CruiseControlMetricsReporterConfig reporterConfig) {
Properties producerProps = CruiseControlMetricsReporterConfig.parseProducerConfigs(configs);

//Add BootstrapServers if not set
if (!producerProps.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) {
String bootstrapServers = getBootstrapServers(configs);
producerProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
LOG.info("Using default value of {} for {}", bootstrapServers,
CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
}

//Add SecurityProtocol if not set
if (!producerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)) {
String securityProtocol = "PLAINTEXT";
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
LOG.info("Using default value of {} for {}", securityProtocol,
CruiseControlMetricsReporterConfig.config(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
CruiseControlMetricsReporterConfig.config(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}

CruiseControlMetricsReporterConfig reporterConfig = new CruiseControlMetricsReporterConfig(configs, false);

setIfAbsent(producerProps,
ProducerConfig.CLIENT_ID_CONFIG,
reporterConfig.getString(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.CLIENT_ID_CONFIG)));
ProducerConfig.CLIENT_ID_CONFIG,
reporterConfig.getString(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.CLIENT_ID_CONFIG)));
setIfAbsent(producerProps, ProducerConfig.LINGER_MS_CONFIG,
reporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_LINGER_MS_CONFIG).toString());
reporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_LINGER_MS_CONFIG).toString());
setIfAbsent(producerProps, ProducerConfig.BATCH_SIZE_CONFIG,
reporterConfig.getInt(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_BATCH_SIZE_CONFIG).toString());
reporterConfig.getInt(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_BATCH_SIZE_CONFIG).toString());
setIfAbsent(producerProps, ProducerConfig.RETRIES_CONFIG, "5");
setIfAbsent(producerProps, ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
setIfAbsent(producerProps, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
setIfAbsent(producerProps, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MetricSerde.class.getName());
setIfAbsent(producerProps, ProducerConfig.ACKS_CONFIG, "all");

_metricsReporterCreateRetries = reporterConfig.getInt(
CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_CREATE_RETRIES_CONFIG);
return producerProps;
}

@Override
public Set<String> reconfigurableConfigs() {
return CruiseControlMetricsReporterConfig.RECONFIGURABLE_CONFIGS;
}

@Override
public void reconfigure(Map<String, ?> configs) {
if (_producer != null) {
_producer.close();
}

LOG.info("Reconfiguring Cruise Control metrics producer");
CruiseControlMetricsReporterConfig reporterConfig = new CruiseControlMetricsReporterConfig(configs, false);
Properties producerProps = buildProducerProperties(configs, reporterConfig);
createCruiseControlMetricsProducer(producerProps);
if (_producer == null) {
this.close();
}

_brokerId = Integer.parseInt((String) configs.get("broker.id"));

_cruiseControlMetricsTopic = reporterConfig.getString(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG);
_reportingIntervalMs = reporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG);
_kubernetesMode = reporterConfig.getBoolean(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_KUBERNETES_MODE_CONFIG);

if (reporterConfig.getBoolean(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG)) {
try {
_metricsTopic = createMetricsTopicFromReporterConfig(reporterConfig);
Properties adminClientConfigs = CruiseControlMetricsUtils.addSslConfigs(producerProps, reporterConfig);
_adminClient = CruiseControlMetricsUtils.createAdminClient(adminClientConfigs);
_metricsTopicAutoCreateTimeoutMs = reporterConfig.getLong(
CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_TIMEOUT_MS_CONFIG);
_metricsTopicAutoCreateRetries = reporterConfig.getInt(
CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_RETRIES_CONFIG);
} catch (CruiseControlMetricsReporterException e) {
LOG.warn("Cruise Control metrics topic auto creation was disabled", e);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.utils.Utils;

public class CruiseControlMetricsReporterConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
private static final Set<String> CONFIGS = new HashSet<>();
public static final String PREFIX = "cruise.control.metrics.reporter.";
static final Set<String> RECONFIGURABLE_CONFIGS = new HashSet<>();
// Configurations
public static final String CRUISE_CONTROL_METRICS_TOPIC_CONFIG = "cruise.control.metrics.topic";
private static final String CRUISE_CONTROL_METRICS_TOPIC_DOC = "The topic to which Cruise Control metrics reporter "
Expand Down Expand Up @@ -57,6 +61,9 @@ public class CruiseControlMetricsReporterConfig extends AbstractConfig {
public static final String CRUISE_CONTROL_METRICS_REPORTER_KUBERNETES_MODE_CONFIG = PREFIX + "kubernetes.mode";
public static final String CRUISE_CONTROL_METRICS_REPORTER_KUBERNETES_MODE_DOC = "Cruise Control metrics reporter will report "
+ "metrics using methods that are aware of container boundaries.";
public static final String CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG = PREFIX + "force.reconfigure";
public static final String CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG_DOC = "Cruise Control metrics reporter force reconfigure "
+ "the flag. Set it a different value (like the current date) to trigger the reconfiguration.";
// Default values
public static final String DEFAULT_CRUISE_CONTROL_METRICS_TOPIC = "__CruiseControlMetrics";
public static final Integer DEFAULT_CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS = -1;
Expand All @@ -74,6 +81,10 @@ public class CruiseControlMetricsReporterConfig extends AbstractConfig {
public static final boolean DEFAULT_CRUISE_CONTROL_METRICS_REPORTER_KUBERNETES_MODE = false;
public static final int DEFAULT_CRUISE_CONTROL_METRICS_REPORTER_CREATE_RETRIES = 2;

public static final Set<String> EXCLUDED_PRODUCER_CONFIGS = Utils.mkSet(
CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG
);

public CruiseControlMetricsReporterConfig(Map<?, ?> originals, boolean doLog) {
super(CONFIG, originals, doLog);
}
Expand Down Expand Up @@ -155,7 +166,15 @@ public CruiseControlMetricsReporterConfig(Map<?, ?> originals, boolean doLog) {
ConfigDef.Type.INT,
DEFAULT_CRUISE_CONTROL_METRICS_BATCH_SIZE,
ConfigDef.Importance.LOW,
CRUISE_CONTROL_METRICS_REPORTER_BATCH_SIZE_DOC);
CRUISE_CONTROL_METRICS_REPORTER_BATCH_SIZE_DOC)
.define(CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG,
ConfigDef.Type.STRING,
null,
ConfigDef.Importance.LOW,
CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG_DOC);
RECONFIGURABLE_CONFIGS.addAll(SslConfigs.RECONFIGURABLE_CONFIGS.stream()
.map(config -> PREFIX + config).collect(Collectors.toSet()));
RECONFIGURABLE_CONFIGS.add(CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG);
}

/**
Expand All @@ -173,7 +192,7 @@ public static String config(String baseConfigName) {
static Properties parseProducerConfigs(Map<String, ?> configMap) {
Properties props = new Properties();
for (Map.Entry<String, ?> entry : configMap.entrySet()) {
if (entry.getKey().startsWith(PREFIX)) {
if (entry.getKey().startsWith(PREFIX) && !EXCLUDED_PRODUCER_CONFIGS.contains(entry.getKey())) {
props.put(entry.getKey().replace(PREFIX, ""), entry.getValue());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,33 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.Assert;
import org.junit.Test;

import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.PREFIX;


public class CruiseControlMetricsReporterSslTest extends CruiseControlMetricsReporterTest {
Expand Down Expand Up @@ -53,6 +70,7 @@ public Properties overridingProps() {
props.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), "1");
props.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
props.setProperty(KafkaConfig.DefaultReplicationFactorProp(), "2");
props.setProperty(KafkaConfig.PasswordEncoderSecretProp(), "test");
return props;
}

Expand All @@ -75,4 +93,30 @@ private String appendPrefix(Object key) {
return CruiseControlMetricsReporterConfig.config((String) key);
}

@Test
public void testAlterConfig() throws Exception {
Properties props = new Properties();
setSecurityConfigs(props, "admin");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
AdminClient adminClient = AdminClient.create(props);

String brokerId = String.valueOf(_brokers.get(0).id());
ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId);
List<AlterConfigOp> ops = new ArrayList<>();

Map<Object, Object> sslProps = _brokers.get(0).config().entrySet().stream()
.filter(entry -> SslConfigs.RECONFIGURABLE_CONFIGS.contains(entry.getKey().toString()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
sslProps.forEach((k, v) -> {
String value = v instanceof Password ? ((Password) v).value() : v.toString();
ops.add(new AlterConfigOp(new ConfigEntry("listener.name.ssl." + k, value), AlterConfigOp.OpType.SET));
ops.add(new AlterConfigOp(new ConfigEntry(PREFIX + k, value), AlterConfigOp.OpType.SET));
});
ops.add(new AlterConfigOp(new ConfigEntry(CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG,
UUID.randomUUID().toString()), AlterConfigOp.OpType.SET));
AlterConfigsResult result = adminClient.incrementalAlterConfigs(Collections.singletonMap(brokerResource, ops));

result.values().get(brokerResource).get();
Thread.sleep(5000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
Expand All @@ -28,11 +29,13 @@ public class CCEmbeddedBroker implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(CCEmbeddedBroker.class);
private final Map<SecurityProtocol, Integer> _ports;
private final Map<SecurityProtocol, String> _hosts;
private final Map<Object, Object> _config;
private final KafkaServer _kafkaServer;
private int _id;
private File _logDir;

public CCEmbeddedBroker(Map<Object, Object> config) {
_config = Collections.unmodifiableMap(config);
_ports = new HashMap<>();
_hosts = new HashMap<>();

Expand Down Expand Up @@ -116,6 +119,10 @@ public int id() {
return _id;
}

public Map<Object, Object> config() {
return _config;
}

/**
* @param protocol Security protocol.
* @return Address containing host and port.
Expand Down
Loading