diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index bc57f62ff70..68f4c2abdb8 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -38,10 +38,10 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import jakarta.inject.Singleton; @@ -62,6 +62,7 @@ import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; import org.apache.accumulo.core.master.thrift.TableInfo; import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; @@ -78,6 +79,8 @@ import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionInfo; +import org.apache.accumulo.monitor.rest.compactions.external.RunningCompactions; +import org.apache.accumulo.monitor.rest.compactions.external.RunningCompactorDetails; import org.apache.accumulo.monitor.util.logging.RecentLogs; import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.HighlyAvailableService; @@ -101,6 +104,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Suppliers; + /** * Serve manager statistics with an embedded web server. */ @@ -612,7 +617,17 @@ public static class CompactionStats { private final Map allCompactions = new HashMap<>(); private final RecentLogs recentLogs = new RecentLogs(); private final ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); - private final Map ecRunningMap = new ConcurrentHashMap<>(); + + // When there are a large amount of external compactions running the list of external compactions + // could consume a lot of memory. The purpose of this memoizing supplier is to try to avoid + // creating the list of running external compactions in memory per web request. If multiple + // request come in around the same time they should use the same list. It is still possible to + // have multiple list in memory if one request obtains a copy and then another request comes in + // after the timeout and the supplier recomputes the list. The longer the timeout on the supplier + // is the less likely we are to have multiple list of external compactions in memory, however + // increasing the timeout will make the monitor less responsive. + private final Supplier extCompactionSnapshot = Suppliers + .memoizeWithExpiration(() -> computeExternalCompactionsSnapshot(), 30, TimeUnit.SECONDS); private long scansFetchedNanos = 0L; private long compactsFetchedNanos = 0L; private long ecInfoFetchedNanos = 0L; @@ -667,12 +682,17 @@ public synchronized ExternalCompactionInfo getCompactorsInfo() { return ecInfo; } - /** - * Fetch running compactions from Compaction Coordinator. Chose not to restrict the frequency of - * user fetches since RPC calls are going to the coordinator. This allows for fine grain updates - * of external compaction progress. - */ - public synchronized Map fetchRunningInfo() { + private static class ExternalCompactionsSnapshot { + public final RunningCompactions runningCompactions; + public final Map ecRunningMap; + + private ExternalCompactionsSnapshot(Map ecRunningMap) { + this.ecRunningMap = Collections.unmodifiableMap(ecRunningMap); + this.runningCompactions = new RunningCompactions(ecRunningMap); + } + } + + private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() { if (coordinatorHost.isEmpty()) { throw new IllegalStateException(coordinatorMissingMsg); } @@ -686,16 +706,20 @@ public synchronized Map fetchRunningInfo() { throw new IllegalStateException("Unable to get running compactions from " + ccHost, e); } - ecRunningMap.clear(); - if (running.getCompactions() != null) { - ecRunningMap.putAll(running.getCompactions()); - } + return new ExternalCompactionsSnapshot(running.getCompactions()); + } - return ecRunningMap; + public RunningCompactions getRunnningCompactions() { + return extCompactionSnapshot.get().runningCompactions; } - public Map getEcRunningMap() { - return ecRunningMap; + public RunningCompactorDetails getRunningCompactorDetails(ExternalCompactionId ecid) { + TExternalCompaction extCompaction = + extCompactionSnapshot.get().ecRunningMap.get(ecid.canonical()); + if (extCompaction == null) { + return null; + } + return new RunningCompactorDetails(extCompaction); } private CompactionCoordinatorService.Client getCoordinator(HostAndPort address) { diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java index c6eab1868c4..72d54d70a4e 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java @@ -60,7 +60,7 @@ public Compactors getCompactors() { @Path("running") @GET public RunningCompactions getRunning() { - return new RunningCompactions(monitor.fetchRunningInfo()); + return monitor.getRunnningCompactions(); } @Path("details") @@ -68,16 +68,11 @@ public RunningCompactions getRunning() { public RunningCompactorDetails getDetails(@QueryParam("ecid") @NotNull String ecid) { // make parameter more user-friendly by ensuring the ecid prefix is present ecid = ExternalCompactionId.from(ecid).canonical(); - var ecMap = monitor.getEcRunningMap(); - var externalCompaction = ecMap.get(ecid); - if (externalCompaction == null) { - // map could be old so fetch all running compactions and try again - ecMap = monitor.fetchRunningInfo(); - externalCompaction = ecMap.get(ecid); - if (externalCompaction == null) { - throw new IllegalStateException("Failed to find details for ECID: " + ecid); - } + var runningCompactorDetails = + monitor.getRunningCompactorDetails(ExternalCompactionId.from(ecid)); + if (runningCompactorDetails == null) { + throw new IllegalStateException("Failed to find details for ECID: " + ecid); } - return new RunningCompactorDetails(externalCompaction); + return runningCompactorDetails; } }