Skip to content

Commit

Permalink
add option for automatic JMX retry (#3511)
Browse files Browse the repository at this point in the history
  • Loading branch information
SylvainJuge authored Feb 9, 2024
1 parent c0a9382 commit 826fbf9
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 71 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ Use subheadings with the "=====" level for adding notes for unreleased changes:
===== Features
* Added a configuration option to use queues in names of spring-rabbit transactions - {pull}3424[#3424]
[float]
===== Features
* Add option to retry JMX metrics capture in case of exception - {pull}3511[#3511]
[float]
===== Bug fixes
* Add support to CLI attach download for new agent signature for 1.46.0+ - {pull}3513[#3513]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
*/
package co.elastic.apm.agent.jmx;

import co.elastic.apm.agent.tracer.configuration.TimeDuration;
import co.elastic.apm.agent.tracer.configuration.TimeDurationValueConverter;
import org.stagemonitor.configuration.ConfigurationOption;
import org.stagemonitor.configuration.ConfigurationOptionProvider;

import java.util.Collections;
import java.util.List;

import static co.elastic.apm.agent.tracer.configuration.RangeValidator.isNotInRange;

public class JmxConfiguration extends ConfigurationOptionProvider {

private ConfigurationOption<List<JmxMetric>> captureJmxMetrics = ConfigurationOption.<List<JmxMetric>>builder(JmxMetric.TokenValueConverter.INSTANCE, List.class)
Expand Down Expand Up @@ -137,4 +141,15 @@ public class JmxConfiguration extends ConfigurationOptionProvider {
ConfigurationOption<List<JmxMetric>> getCaptureJmxMetrics() {
return captureJmxMetrics;
}

private final ConfigurationOption<TimeDuration> faildRetryInterval = TimeDurationValueConverter.durationOption("m")
.key("jmx_failed_retry_interval")
.tags("internal")
.description("If set to a value greater or equal to 1m, the agent will retry failed JMX metric registrations.")
.addValidator(isNotInRange(TimeDuration.of("1ms"), TimeDuration.of("59s")))
.buildWithDefault(TimeDuration.of("0m"));

public ConfigurationOption<TimeDuration> getFaildRetryInterval() {
return faildRetryInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import co.elastic.apm.agent.metrics.DoubleSupplier;
import co.elastic.apm.agent.metrics.Labels;
import co.elastic.apm.agent.metrics.MetricRegistry;
import co.elastic.apm.agent.tracer.GlobalLocks;
import co.elastic.apm.agent.sdk.internal.util.ExecutorUtils;
import co.elastic.apm.agent.sdk.internal.util.PrivilegedActionUtils;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import co.elastic.apm.agent.sdk.internal.util.PrivilegedActionUtils;
import co.elastic.apm.agent.tracer.GlobalLocks;
import co.elastic.apm.agent.tracer.configuration.TimeDuration;
import org.stagemonitor.configuration.ConfigurationOption;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -54,6 +56,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class JmxMetricTracker extends AbstractLifecycleListener {
Expand All @@ -68,9 +71,17 @@ public class JmxMetricTracker extends AbstractLifecycleListener {
@Nullable
private volatile NotificationListener listener;

private final List<JmxMetric> failedMetrics;

@Nullable
private ScheduledExecutorService retryExecutor;

public JmxMetricTracker(ElasticApmTracer tracer) {
jmxConfiguration = tracer.getConfig(JmxConfiguration.class);
metricRegistry = tracer.getMetricRegistry();

// using a synchronized list so adding to the list does not require synchronization
failedMetrics = Collections.synchronizedList(new ArrayList<JmxMetric>());
}

@Override
Expand Down Expand Up @@ -175,19 +186,52 @@ synchronized void init(final MBeanServer platformMBeanServer) {
jmxConfiguration.getCaptureJmxMetrics().addChangeListener(new ConfigurationOption.ChangeListener<List<JmxMetric>>() {
@Override
public void onChange(ConfigurationOption<?> configurationOption, List<JmxMetric> oldValue, List<JmxMetric> newValue) {
List<JmxMetricRegistration> oldRegistrations = compileJmxMetricRegistrations(oldValue, platformMBeanServer);
List<JmxMetricRegistration> newRegistrations = compileJmxMetricRegistrations(newValue, platformMBeanServer);
List<JmxMetric> registrationErrors = new ArrayList<JmxMetric>(); // those are not needed
List<JmxMetricRegistration> oldRegistrations = compileJmxMetricRegistrations(oldValue, platformMBeanServer, registrationErrors);

List<JmxMetricRegistration> newRegistrations;
synchronized (failedMetrics) {
failedMetrics.clear();
newRegistrations = compileJmxMetricRegistrations(newValue, platformMBeanServer, failedMetrics);
}


for (JmxMetricRegistration addedRegistration : removeAll(oldRegistrations, newRegistrations)) {
addedRegistration.register(platformMBeanServer, metricRegistry);
}
for (JmxMetricRegistration deletedRegistration : removeAll(newRegistrations, oldRegistrations)) {
deletedRegistration.unregister(metricRegistry);
}

}
});
register(jmxConfiguration.getCaptureJmxMetrics().get(), platformMBeanServer);

ConfigurationOption<TimeDuration> failedRetryConfig = jmxConfiguration.getFaildRetryInterval();
if (!failedRetryConfig.isDefault()) {
long retryMillis = failedRetryConfig.getValue().getMillis();
if (retryExecutor != null) {
ExecutorUtils.shutdownAndWaitTermination(retryExecutor);
}

retryExecutor = ExecutorUtils.createSingleThreadSchedulingDaemonPool("jmx-retry");
retryExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
retryFailedJmx(platformMBeanServer);
}
}, retryMillis, retryMillis, TimeUnit.MILLISECONDS);
}

register(jmxConfiguration.getCaptureJmxMetrics().get(), platformMBeanServer, failedMetrics);
}

// package-private for testing
void retryFailedJmx(MBeanServer platformMBeanServer) {
List<JmxMetric> failed = JmxMetricTracker.this.failedMetrics;
synchronized (failed) {
List<JmxMetric> toRetry = new ArrayList<>(failed);
failed.clear();
register(toRetry, platformMBeanServer, failed);
}
}

private void registerMBeanNotificationListener(final MBeanServer server) {
Expand Down Expand Up @@ -217,7 +261,7 @@ private void addMBean(ObjectName mBeanName, JmxMetric jmxMetric) {
ObjectName metricName = jmxMetric.getObjectName();
if (metricName.apply(mBeanName) || matchesJbossStatisticsPool(mBeanName, metricName, server)) {
logger.debug("MBean added at runtime: {}", jmxMetric.getObjectName());
register(Collections.singletonList(jmxMetric), server);
register(Collections.singletonList(jmxMetric), server, failedMetrics);
}
}

Expand Down Expand Up @@ -280,28 +324,36 @@ private static <T> List<T> removeAll(List<T> removeFromThis, List<T> toRemove) {
return result;
}

private void register(List<JmxMetric> jmxMetrics, MBeanServer server) {
for (JmxMetricRegistration registration : compileJmxMetricRegistrations(jmxMetrics, server)) {
private void register(List<JmxMetric> jmxMetrics, MBeanServer server, List<JmxMetric> failedMetrics) {
for (JmxMetricRegistration registration : compileJmxMetricRegistrations(jmxMetrics, server, failedMetrics)) {
registration.register(server, metricRegistry);
}
}

/**
* A single {@link JmxMetric} can yield multiple {@link JmxMetricRegistration}s if the {@link JmxMetric} contains multiple attributes
*
* @param jmxMetrics JMX metrics to register
* @param server MBean server
* @param failedMetrics list of JMX metrics that failed to register (out)
*/
private List<JmxMetricRegistration> compileJmxMetricRegistrations(List<JmxMetric> jmxMetrics, MBeanServer server) {
List<JmxMetricRegistration> registrations = new ArrayList<>();
private List<JmxMetricRegistration> compileJmxMetricRegistrations(List<JmxMetric> jmxMetrics, MBeanServer server, List<JmxMetric> failedMetrics) {
List<JmxMetricRegistration> globalRegistrations = new ArrayList<>();
for (JmxMetric jmxMetric : jmxMetrics) {
List<JmxMetricRegistration> metricRegistrations = new ArrayList<>();
try {
addJmxMetricRegistration(jmxMetric, registrations, server);
addJmxMetricRegistration(jmxMetric, metricRegistrations, server);
globalRegistrations.addAll(metricRegistrations);
} catch (Exception e) {
failedMetrics.add(jmxMetric);
logger.error("Failed to register JMX metric {}", jmxMetric.toString(), e);
}

}
return registrations;
return globalRegistrations;
}

private static void addJmxMetricRegistration(final JmxMetric jmxMetric, List<JmxMetricRegistration> registrations, MBeanServer server) throws JMException {
private void addJmxMetricRegistration(final JmxMetric jmxMetric, List<JmxMetricRegistration> registrations, MBeanServer server) throws JMException {
Set<ObjectInstance> mbeans = server.queryMBeans(jmxMetric.getObjectName(), null);
if (!mbeans.isEmpty()) {
logger.debug("Found mbeans for object name {}", jmxMetric.getObjectName());
Expand Down Expand Up @@ -355,20 +407,21 @@ private static String metricPrepend(Labels labels) {
return "";
}

private static void addJmxMetricRegistration(JmxMetric jmxMetric, List<JmxMetricRegistration> registrations, ObjectName objectName, Object value, JmxMetric.Attribute attribute, String attributeName, String metricPrepend) throws AttributeNotFoundException {
private void addJmxMetricRegistration(JmxMetric jmxMetric, List<JmxMetricRegistration> registrations, ObjectName objectName, Object value, JmxMetric.Attribute attribute, String attributeName, @Nullable String metricPrepend) throws AttributeNotFoundException {
String effectiveAttributeName = metricPrepend == null ? attributeName : metricPrepend + attributeName;
boolean unsubscribeOnError = jmxConfiguration.getFaildRetryInterval().isDefault();
if (value instanceof Number) {
logger.debug("Found number attribute {}={}", attribute.getJmxAttributeName(), value);
registrations.add(
new JmxMetricRegistration(
attribute.getMetricName(
metricPrepend == null ?
attributeName :
metricPrepend + attributeName
effectiveAttributeName
),
attribute.getLabels(objectName),
attributeName,
null,
objectName
objectName,
unsubscribeOnError
)
);
} else if (value instanceof CompositeData) {
Expand All @@ -380,14 +433,12 @@ private static void addJmxMetricRegistration(JmxMetric jmxMetric, List<JmxMetric
new JmxMetricRegistration(
attribute.getCompositeMetricName(
key,
metricPrepend == null ?
attributeName :
metricPrepend + attributeName
),
effectiveAttributeName),
attribute.getLabels(objectName),
attributeName,
key,
objectName
objectName,
unsubscribeOnError
)
);
} else {
Expand All @@ -411,13 +462,15 @@ static class JmxMetricRegistration {
@Nullable
private final String compositeDataKey;
private final ObjectName objectName;
private final boolean unsubscribeOnError;

private JmxMetricRegistration(String metricName, Labels labels, String jmxAttribute, @Nullable String compositeDataKey, ObjectName objectName) {
private JmxMetricRegistration(String metricName, Labels labels, String jmxAttribute, @Nullable String compositeDataKey, ObjectName objectName, boolean unsubscribeOnError) {
this.metricName = metricName;
this.labels = labels.immutableCopy();
this.jmxAttribute = jmxAttribute;
this.compositeDataKey = compositeDataKey;
this.objectName = objectName;
this.unsubscribeOnError = unsubscribeOnError;
}


Expand All @@ -427,13 +480,17 @@ void register(final MBeanServer server, final MetricRegistry metricRegistry) {
@Override
public double get() {
try {
double value;
if (compositeDataKey == null) {
return ((Number) server.getAttribute(objectName, jmxAttribute)).doubleValue();
value = ((Number) server.getAttribute(objectName, jmxAttribute)).doubleValue();
} else {
return ((Number) ((CompositeData) server.getAttribute(objectName, jmxAttribute)).get(compositeDataKey)).doubleValue();
value = ((Number) ((CompositeData) server.getAttribute(objectName, jmxAttribute)).get(compositeDataKey)).doubleValue();
}
return value;
} catch (InstanceNotFoundException | AttributeNotFoundException e) {
unregister(metricRegistry);
if (unsubscribeOnError) {
unregister(metricRegistry);
}
return Double.NaN;
} catch (Exception e) {
return Double.NaN;
Expand Down Expand Up @@ -473,5 +530,8 @@ public void stop() throws Exception {
if (logManagerPropertyPoller != null) {
logManagerPropertyPoller.interrupt();
}
if (retryExecutor != null) {
ExecutorUtils.shutdownAndWaitTermination(retryExecutor);
}
}
}
Loading

0 comments on commit 826fbf9

Please sign in to comment.