diff --git a/pom.xml b/pom.xml index 1faae076330..a47435ace4d 100644 --- a/pom.xml +++ b/pom.xml @@ -151,6 +151,7 @@ 5.5.0 2.24.1 3.3.6 + 2.24.0 1.34.1 2.0.9 2.0.12 @@ -198,7 +199,7 @@ org.apache.logging.log4j log4j-bom - 2.23.1 + ${version.log4j} pom import @@ -792,6 +793,11 @@ auto-service ${version.auto-service} + + org.apache.logging.log4j + log4j-core + ${version.log4j} + diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java index 87733dbf1c4..39a8d716273 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java @@ -31,7 +31,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.function.BooleanSupplier; import java.util.function.Supplier; import org.apache.accumulo.core.Constants; @@ -65,11 +64,11 @@ public class AccumuloMonitorAppender extends AbstractAppender { @PluginBuilderFactory - public static > B newBuilder() { - return new Builder().asBuilder(); + public static Builder newBuilder() { + return new Builder(); } - public static class Builder> extends AbstractAppender.Builder + public static class Builder extends AbstractAppender.Builder implements org.apache.logging.log4j.core.util.Builder { @PluginBuilderAttribute @@ -81,7 +80,7 @@ public static class Builder> extends AbstractAppender.Build @PluginBuilderAttribute private int maxThreads = 2; - public Builder setAsync(boolean async) { + public Builder setAsync(boolean async) { this.async = async; return this; } @@ -90,7 +89,7 @@ public boolean getAsync() { return async; } - public Builder setQueueSize(int size) { + public Builder setQueueSize(int size) { queueSize = size; return this; } @@ -99,7 +98,7 @@ public int getQueueSize() { return queueSize; } - public Builder setMaxThreads(int maxThreads) { + public Builder setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; return this; } @@ -119,8 +118,9 @@ public AccumuloMonitorAppender build() { private final Gson gson = new Gson(); private final HttpClient httpClient; private final Supplier> monitorLocator; - private final BooleanSupplier canAppend; + private final ThreadPoolExecutor executor; private final boolean async; + private final int queueSize; private ServerContext context; private String path; @@ -131,20 +131,15 @@ private AccumuloMonitorAppender(final String name, final Filter filter, boolean async) { super(name, filter, null, ignoreExceptions, properties); + this.executor = async ? new ThreadPoolExecutor(0, maxThreads, 30, TimeUnit.SECONDS, + new LinkedBlockingQueue()) : null; + final var builder = HttpClient.newBuilder(); + this.httpClient = (async ? builder.executor(executor) : builder).build(); + this.queueSize = queueSize; this.async = async; - var builder = HttpClient.newBuilder(); - if (async) { - var queue = new LinkedBlockingQueue(); - builder = - builder.executor(new ThreadPoolExecutor(0, maxThreads, 30, TimeUnit.SECONDS, queue)); - this.canAppend = () -> queue.size() < queueSize; - } else { - this.canAppend = () -> true; - } - this.httpClient = builder.build(); - final ZcStat stat = new ZcStat(); - monitorLocator = () -> { + final var stat = new ZcStat(); + this.monitorLocator = () -> { // lazily set up context/path if (context == null) { context = new ServerContext(SiteConfiguration.auto()); @@ -181,7 +176,7 @@ public void append(final LogEvent event) { .setHeader("Content-Type", "application/json").build(); if (async) { - if (canAppend.getAsBoolean()) { + if (executor.getQueue().size() < queueSize) { @SuppressWarnings("unused") var future = httpClient.sendAsync(req, BodyHandlers.discarding()); } else { @@ -196,6 +191,17 @@ public void append(final LogEvent event) { }); } + @Override + protected boolean stop(long timeout, TimeUnit timeUnit, boolean changeLifeCycleState) { + if (changeLifeCycleState) { + setStopping(); + } + if (executor != null) { + executor.shutdown(); + } + return super.stop(timeout, timeUnit, changeLifeCycleState); + } + @SuppressFBWarnings(value = "INFORMATION_EXPOSURE_THROUGH_AN_ERROR_MESSAGE", justification = "throwable is intended to be printed to output stream, to send to monitor") private static String throwableToStacktrace(Throwable t) {