Skip to content

Commit

Permalink
Adds property to minor compact based on age of data in memory
Browse files Browse the repository at this point in the history
Adds a new property that will initiate a minor compaction based on the
age of data in memory. Tablets now roughly track that age of the first
write to a tablets in memory map and when this age exceeds a configured
threshold a minor compaction is initiated.

fixes apache#3397
  • Loading branch information
keith-turner committed Sep 3, 2024
1 parent dcf951f commit 8cb0982
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 10 deletions.
14 changes: 11 additions & 3 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -908,10 +908,18 @@ public enum Property {
TABLE_MAX_END_ROW_SIZE("table.split.endrow.size.max", "10k", PropertyType.BYTES,
"Maximum size of end row.", "1.7.0"),
TABLE_MINC_COMPACT_IDLETIME("table.compaction.minor.idle", "5m", PropertyType.TIMEDURATION,
"After a tablet has been idle (no mutations) for this time period it may have its "
+ "in-memory map flushed to disk in a minor compaction. There is no guarantee an idle "
+ "tablet will be compacted.",
"When the time since the last write to a tablets in memory map exceeds this threshold. "
+ " a minor compaction may be initiated. There is no guarantee an idle tablet will be compacted.",
"1.3.5"),
TABLE_MINC_COMPACT_MAXAGE("table.compaction.minor.age", "365000d", PropertyType.TIMEDURATION,
"When the time since the first write to a tablets in memory map exceeds this threshold "
+ "a minor compaction may be initiated. This determines the maximum amount of time new data can "
+ "be buffered in memory before being flushed to a file. This is useful when using scan servers "
+ "in conjunction with the property " + SSERV_CACHED_TABLET_METADATA_EXPIRATION.getKey()
+ ". These two properties together can be used to control that amount of time it takes for a scan "
+ "server to see a write to a tablet server. The default value of this property is set to such a "
+ "high value that is should never cause a minor compaction.",
"3.1.0"),
TABLE_COMPACTION_DISPATCHER("table.compaction.dispatcher",
SimpleCompactionDispatcher.class.getName(), PropertyType.CLASSNAME,
"A configurable dispatcher that decides what compaction service a table should use.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_TABLET_MIGRATION_POOL;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -55,6 +56,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
Expand All @@ -78,6 +80,7 @@
import org.apache.accumulo.core.spi.scan.ScanPrioritizer;
import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.ServerContext;
Expand Down Expand Up @@ -586,9 +589,10 @@ private void manageMemory() {
}
}

public void updateMemoryUsageStats(Tablet tablet, long size, long lastCommitTime,
long mincSize) {
memUsageReports.add(new TabletMemoryReport(tablet, lastCommitTime, size, mincSize));
public void updateMemoryUsageStats(Tablet tablet, long size, long lastCommitTime, long mincSize,
Duration elapsedSinceFirstWrite) {
memUsageReports.add(
new TabletMemoryReport(tablet, lastCommitTime, size, mincSize, elapsedSinceFirstWrite));
}

public void tabletClosed(KeyExtent extent) {
Expand Down Expand Up @@ -698,6 +702,7 @@ public synchronized ScanFileManager newScanFileManager(ScanDispatch scanDispatch

private final AtomicLong lastReportedSize = new AtomicLong();
private final AtomicLong lastReportedMincSize = new AtomicLong();
private final AtomicReference<Timer> firstReportedCommitTimer = new AtomicReference<>(null);
private volatile long lastReportedCommitTime = 0;

public void updateMemoryUsageStats(Tablet tablet, long size, long mincSize) {
Expand All @@ -722,6 +727,17 @@ public void updateMemoryUsageStats(Tablet tablet, long size, long mincSize) {
}

long currentTime = System.currentTimeMillis();

if (size == 0) {
// when a new in memory map is created this method is called with a size of zero so use that
// to reset the first write timer
firstReportedCommitTimer.set(null);
} else if (firstReportedCommitTimer.get() == null) {
// this is the first time a non zero size was seen for this in memory map so consider this
// the time of the first write
firstReportedCommitTimer.compareAndSet(null, Timer.startNew());
}

if ((delta > 32000 || delta < 0 || (currentTime - lastReportedCommitTime > 1000))
&& lastReportedSize.compareAndSet(lrs, totalSize)) {
if (delta > 0) {
Expand All @@ -731,7 +747,11 @@ public void updateMemoryUsageStats(Tablet tablet, long size, long mincSize) {
}

if (report) {
memMgmt.updateMemoryUsageStats(tablet, size, lastReportedCommitTime, mincSize);
// read volatile once into local variable since its read twice when computing the duration.
Timer localTimer = firstReportedCommitTimer.get();
Duration elapsedSinceFirstWrite = localTimer == null ? Duration.ZERO : localTimer.elapsed();
memMgmt.updateMemoryUsageStats(tablet, size, lastReportedCommitTime, mincSize,
elapsedSinceFirstWrite);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class LargestFirstMemoryManager {
private double compactionThreshold;
private long maxObserved;
private final HashMap<TableId,Long> mincIdleThresholds = new HashMap<>();
private final HashMap<TableId,Long> mincAgeThresholds = new HashMap<>();
private ServerContext context = null;

private static class TabletInfo {
Expand Down Expand Up @@ -145,6 +146,12 @@ protected long getMinCIdleThreshold(KeyExtent extent) {
return mincIdleThresholds.get(tableId);
}

protected long getMaxAge(KeyExtent extent) {
TableId tableId = extent.tableId();
return mincAgeThresholds.computeIfAbsent(tableId, tid -> context.getTableConfiguration(tid)
.getTimeInMillis(Property.TABLE_MINC_COMPACT_MAXAGE));
}

protected boolean tableExists(TableId tableId) {
// make sure that the table still exists by checking if it has a configuration
return context.getTableConfiguration(tableId) != null;
Expand Down Expand Up @@ -191,7 +198,8 @@ public List<KeyExtent> tabletsToMinorCompact(List<TabletMemoryReport> tablets) {
TabletInfo tabletInfo = new TabletInfo(tablet, memTabletSize, idleTime, timeMemoryLoad);
try {
// If the table was deleted, getMinCIdleThreshold will throw an exception
if (idleTime > getMinCIdleThreshold(tablet)) {
if (idleTime > getMinCIdleThreshold(tablet)
|| ts.getElapsedSinceFirstWrite().toMillis() > getMaxAge(tablet)) {
largestIdleMemTablets.put(timeMemoryLoad, tabletInfo);
}
} catch (IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.accumulo.tserver.memory;

import java.time.Duration;

import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.tserver.tablet.Tablet;

Expand All @@ -27,13 +29,15 @@ public class TabletMemoryReport implements Cloneable {
private final long lastCommitTime;
private final long memTableSize;
private final long minorCompactingMemTableSize;
private final Duration elapsedSinceFirstWrite;

public TabletMemoryReport(Tablet tablet, long lastCommitTime, long memTableSize,
long minorCompactingMemTableSize) {
long minorCompactingMemTableSize, Duration elapsedSinceFirstWrite) {
this.tablet = tablet;
this.lastCommitTime = lastCommitTime;
this.memTableSize = memTableSize;
this.minorCompactingMemTableSize = minorCompactingMemTableSize;
this.elapsedSinceFirstWrite = elapsedSinceFirstWrite;
}

public KeyExtent getExtent() {
Expand All @@ -48,6 +52,10 @@ public long getLastCommitTime() {
return lastCommitTime;
}

public Duration getElapsedSinceFirstWrite() {
return elapsedSinceFirstWrite;
}

public long getMemTableSize() {
return memTableSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.easymock.EasyMock.replay;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.function.Predicate;
Expand Down Expand Up @@ -194,6 +195,25 @@ public void testDeletedTable() {
assertEquals(extent, tabletsToMinorCompact.get(0));
}

@Test
public void testMaxAge() {
LargestFirstMemoryManagerUnderTest mgr = new LargestFirstMemoryManagerUnderTest();
mgr.init(context);
List<KeyExtent> tabletsToMinorCompact;

// nothing to do
tabletsToMinorCompact =
mgr.tabletsToMinorCompact(tablets(t(k("x"), ZERO, 1000, 0), t(k("y"), ZERO, 2000, 0)));
assertEquals(0, tabletsToMinorCompact.size());

// a tablet that exceeds the configured max age should need to compact
tabletsToMinorCompact =
mgr.tabletsToMinorCompact(tablets(t(k("x"), ZERO, 1000, 0, Duration.ofMinutes(14)),
t(k("y"), ZERO, 2000, 0, Duration.ofMinutes(16))));
assertEquals(1, tabletsToMinorCompact.size());
assertEquals(k("y"), tabletsToMinorCompact.get(0));
}

private static class LargestFirstMemoryManagerUnderTest extends LargestFirstMemoryManager {

public long currentTime = ZERO;
Expand All @@ -208,6 +228,11 @@ protected long getMinCIdleThreshold(KeyExtent extent) {
return MINUTES.toMillis(15);
}

@Override
protected long getMaxAge(KeyExtent extent) {
return MINUTES.toMillis(15);
}

@Override
protected boolean tableExists(TableId tableId) {
return true;
Expand Down Expand Up @@ -247,7 +272,18 @@ private static KeyExtent k(String endRow) {
}

private TabletMemoryReport t(KeyExtent ke, long lastCommit, long memSize, long compactingSize) {
return new TabletMemoryReport(null, lastCommit, memSize, compactingSize) {
return new TabletMemoryReport(null, lastCommit, memSize, compactingSize, Duration.ZERO) {
@Override
public KeyExtent getExtent() {
return ke;
}
};
}

private TabletMemoryReport t(KeyExtent ke, long lastCommit, long memSize, long compactingSize,
Duration elapsedSinceFirstWrite) {
return new TabletMemoryReport(null, lastCommit, memSize, compactingSize,
elapsedSinceFirstWrite) {
@Override
public KeyExtent getExtent() {
return ke;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test;

import static org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel.EVENTUAL;
import static org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel.IMMEDIATE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Test;

public class ScanServerMaxLatencyIT extends ConfigurableMacBase {

protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setProperty(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION, "2s");
}

@Test
public void testMaxLatency() throws Exception {
final String[] tables = this.getUniqueNames(3);
final String table1 = tables[0];
final String table2 = tables[1];
final String table3 = tables[2];

getCluster().getConfig().setNumScanServers(1);
getCluster().getClusterControl().startAllServers(ServerType.SCAN_SERVER);

ExecutorService executor = Executors.newCachedThreadPool();
try (var client = Accumulo.newClient().from(getClientProperties()).build()) {

Wait.waitFor(() -> !client.instanceOperations().getScanServers().isEmpty());

var ntc = new NewTableConfiguration();
ntc.setProperties(Map.of(Property.TABLE_MINC_COMPACT_MAXAGE.getKey(), "2s"));
client.tableOperations().create(table1, ntc);
client.tableOperations().create(table2);
ntc = new NewTableConfiguration();
ntc.setProperties(Map.of(Property.TABLE_MINC_COMPACT_IDLETIME.getKey(), "2s"));
client.tableOperations().create(table3, ntc);

Timer timer = Timer.startNew();

executor.submit(createWriterTask(client, table1, timer));
executor.submit(createWriterTask(client, table2, timer));

long lastMaxSeen = -1;
int changes = 0;

List<Long> deltas = new ArrayList<>();

while (changes < 4) {
Thread.sleep(250);
var currElapsed = timer.elapsed(TimeUnit.MILLISECONDS);
var maxElapsedInTable = readMaxElapsed(client, EVENTUAL, table1);

if (maxElapsedInTable > 0 && maxElapsedInTable != lastMaxSeen) {
log.info("new max elapsed seen {} {}", lastMaxSeen, maxElapsedInTable);
changes++;
lastMaxSeen = maxElapsedInTable;
}

if (maxElapsedInTable > 0) {
// This is difference in elapsed time written to the table vs the most recent elapsed
// time.
deltas.add(currElapsed - maxElapsedInTable);
}

// The other table does not have the setting to minor compact based on age, so should never
// see any data for it from the scan server.
assertEquals(-1, readMaxElapsed(client, EVENTUAL, table2));
// The background thread is writing to this table every 100ms so it should not be considered
// idle and therefor should not minor compact.
assertEquals(-1, readMaxElapsed(client, EVENTUAL, table3));
}

var stats = deltas.stream().mapToLong(l -> l).summaryStatistics();
log.info("Delta stats : {}", stats);
// Should usually see data within 4 seconds, but not always because the timings config are
// when things should start to happen and not when they are guaranteed to finish. Would expect
// the average to be less than 4 seconds and the max less than 8 seconds. These numbers may
// not hold if running test on a heavily loaded machine.
assertTrue(stats.getAverage() > 500 && stats.getAverage() < 4000);
assertTrue(stats.getMax() < 8000);
assertTrue(stats.getCount() > 9);

executor.shutdownNow();
executor.awaitTermination(600, TimeUnit.SECONDS);

assertEquals(-1, readMaxElapsed(client, EVENTUAL, table2));
// This test assumes the 2nd table returns nothing because it is reading it via scan server.
// Validate this test assumption by doing an immediate scan using tablet server which should
// return data.
assertTrue(readMaxElapsed(client, IMMEDIATE, table2) > 0);
// Now that nothing is writing its expected that max read by an immediate scan will see any
// data an eventual scan would see.
assertTrue(
readMaxElapsed(client, IMMEDIATE, table1) >= readMaxElapsed(client, EVENTUAL, table1));
}

}

private long readMaxElapsed(AccumuloClient client, ScannerBase.ConsistencyLevel consistency,
String table) throws Exception {
try (var scanner = client.createScanner(table)) {
scanner.setConsistencyLevel(consistency);
scanner.fetchColumn(new Text("elapsed"), new Text("nanos"));
return scanner.stream().mapToLong(e -> Long.parseLong(e.getValue().toString(), 10)).max()
.orElse(-1);
}
}

private static Callable<Void> createWriterTask(AccumuloClient client, String table, Timer timer) {
SecureRandom random = new SecureRandom();
Callable<Void> writerTask = () -> {
try (var writer = client.createBatchWriter(table)) {
while (true) {
var elapsed = timer.elapsed(TimeUnit.MILLISECONDS);
Mutation m = new Mutation(Long.toHexString(random.nextLong()));
m.put("elapsed", "nanos", "" + elapsed);
writer.addMutation(m);
writer.flush();
Thread.sleep(100);
}
}
};
return writerTask;
}
}

0 comments on commit 8cb0982

Please sign in to comment.