Skip to content

Commit

Permalink
Additional improvements to monitor appender
Browse files Browse the repository at this point in the history
* Bump to log4j 2.24.0
* Use the annotation processor to generate Log4j2Plugins.dat in the
  monitor jar's META-INF/, rather than rely on Log4j2's deprecated
  package scanning to find and register the AccumuloMonitorAppender
* Simplify the generics for the AccumuloMonitorAppender.Builder
* Add cleanup to executor, to make a best effort to not leave it running
  when the appender is reconstructed due to configuration changes
* Simplify "canAppend" logic by just saving the executor as a member, so
  we can get the queue size from that
  • Loading branch information
ctubbsii committed Sep 20, 2024
1 parent 4878426 commit 5dea9d8
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 22 deletions.
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
<version.curator>5.5.0</version.curator>
<version.errorprone>2.24.1</version.errorprone>
<version.hadoop>3.3.6</version.hadoop>
<version.log4j>2.24.0</version.log4j>
<version.opentelemetry>1.34.1</version.opentelemetry>
<version.powermock>2.0.9</version.powermock>
<version.slf4j>2.0.12</version.slf4j>
Expand Down Expand Up @@ -198,7 +199,7 @@
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-bom</artifactId>
<version>2.23.1</version>
<version>${version.log4j}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down Expand Up @@ -792,6 +793,11 @@
<artifactId>auto-service</artifactId>
<version>${version.auto-service}</version>
</path>
<path>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${version.log4j}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;

import org.apache.accumulo.core.Constants;
Expand Down Expand Up @@ -65,11 +64,11 @@
public class AccumuloMonitorAppender extends AbstractAppender {

@PluginBuilderFactory
public static <B extends Builder<B>> B newBuilder() {
return new Builder<B>().asBuilder();
public static Builder newBuilder() {
return new Builder();
}

public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B>
public static class Builder extends AbstractAppender.Builder<Builder>
implements org.apache.logging.log4j.core.util.Builder<AccumuloMonitorAppender> {

@PluginBuilderAttribute
Expand All @@ -81,7 +80,7 @@ public static class Builder<B extends Builder<B>> extends AbstractAppender.Build
@PluginBuilderAttribute
private int maxThreads = 2;

public Builder<B> setAsync(boolean async) {
public Builder setAsync(boolean async) {
this.async = async;
return this;
}
Expand All @@ -90,7 +89,7 @@ public boolean getAsync() {
return async;
}

public Builder<B> setQueueSize(int size) {
public Builder setQueueSize(int size) {
queueSize = size;
return this;
}
Expand All @@ -99,7 +98,7 @@ public int getQueueSize() {
return queueSize;
}

public Builder<B> setMaxThreads(int maxThreads) {
public Builder setMaxThreads(int maxThreads) {
this.maxThreads = maxThreads;
return this;
}
Expand All @@ -119,8 +118,9 @@ public AccumuloMonitorAppender build() {
private final Gson gson = new Gson();
private final HttpClient httpClient;
private final Supplier<Optional<URI>> monitorLocator;
private final BooleanSupplier canAppend;
private final ThreadPoolExecutor executor;
private final boolean async;
private final int queueSize;

private ServerContext context;
private String path;
Expand All @@ -131,20 +131,15 @@ private AccumuloMonitorAppender(final String name, final Filter filter,
boolean async) {
super(name, filter, null, ignoreExceptions, properties);

this.executor = async ? new ThreadPoolExecutor(0, maxThreads, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()) : null;
final var builder = HttpClient.newBuilder();
this.httpClient = (async ? builder.executor(executor) : builder).build();
this.queueSize = queueSize;
this.async = async;
var builder = HttpClient.newBuilder();
if (async) {
var queue = new LinkedBlockingQueue<Runnable>();
builder =
builder.executor(new ThreadPoolExecutor(0, maxThreads, 30, TimeUnit.SECONDS, queue));
this.canAppend = () -> queue.size() < queueSize;
} else {
this.canAppend = () -> true;
}
this.httpClient = builder.build();

final ZcStat stat = new ZcStat();
monitorLocator = () -> {
final var stat = new ZcStat();
this.monitorLocator = () -> {
// lazily set up context/path
if (context == null) {
context = new ServerContext(SiteConfiguration.auto());
Expand Down Expand Up @@ -181,7 +176,7 @@ public void append(final LogEvent event) {
.setHeader("Content-Type", "application/json").build();

if (async) {
if (canAppend.getAsBoolean()) {
if (executor.getQueue().size() < queueSize) {
@SuppressWarnings("unused")
var future = httpClient.sendAsync(req, BodyHandlers.discarding());
} else {
Expand All @@ -196,6 +191,17 @@ public void append(final LogEvent event) {
});
}

@Override
protected boolean stop(long timeout, TimeUnit timeUnit, boolean changeLifeCycleState) {
if (changeLifeCycleState) {
setStopping();
}
if (executor != null) {
executor.shutdown();
}
return super.stop(timeout, timeUnit, changeLifeCycleState);
}

@SuppressFBWarnings(value = "INFORMATION_EXPOSURE_THROUGH_AN_ERROR_MESSAGE",
justification = "throwable is intended to be printed to output stream, to send to monitor")
private static String throwableToStacktrace(Throwable t) {
Expand Down

0 comments on commit 5dea9d8

Please sign in to comment.