Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-turner committed Sep 3, 2024
1 parent 45f6fbf commit f231eb4
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 55 deletions.
13 changes: 9 additions & 4 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -908,12 +908,17 @@ public enum Property {
TABLE_MAX_END_ROW_SIZE("table.split.endrow.size.max", "10k", PropertyType.BYTES,
"Maximum size of end row.", "1.7.0"),
TABLE_MINC_COMPACT_IDLETIME("table.compaction.minor.idle", "5m", PropertyType.TIMEDURATION,
"After a tablet has been idle (no mutations) for this time period it may have its "
+ "in-memory map flushed to disk in a minor compaction. There is no guarantee an idle "
+ "tablet will be compacted.",
"When the time since the last write to a tablets in memory map exceeds this threshold. "
+ " a minor compaction may be initiated. There is no guarantee an idle tablet will be compacted.",
"1.3.5"),
TABLE_MINC_COMPACT_MAXAGE("table.compaction.minor.age", "365000d", PropertyType.TIMEDURATION,
"The maximum amount of time new data can be buffered in memory before being flushed to a file. This is useful when using scan servers. TODO better description, first write vs last write.",
"When the time since the first write to a tablets in memory map exceeds this threshold "
+ "a minor compaction may be initiated. This determines the maximum amount of time new data can "
+ "be buffered in memory before being flushed to a file. This is useful when using scan servers "
+ "in conjunction with the property " + SSERV_CACHED_TABLET_METADATA_EXPIRATION.getKey()
+ ". These two properties together can be used to control that amount of time it takes for a scan "
+ "server to see a write to a tablet server.. The default value of this property is set to such a "
+ "high value that is should never cause a minor compaction.",
"3.1.0"),
TABLE_COMPACTION_DISPATCHER("table.compaction.dispatcher",
SimpleCompactionDispatcher.class.getName(), PropertyType.CLASSNAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.PreAllocatedArray;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.commons.lang3.mutable.MutableLong;
Expand Down Expand Up @@ -489,8 +488,6 @@ public void mutate(List<Mutation> mutations, int kvCount) {

private final Object writeSerializer = new Object();

private Timer firstWriteTimer = null;

/**
* Applies changes to a row in the InMemoryMap
*
Expand All @@ -504,10 +501,6 @@ public void mutate(List<Mutation> mutations, int numKVs) {
//
// using separate lock from this map, to allow read/write in parallel
synchronized (writeSerializer) {
if (firstWriteTimer == null) {
// TODO can the be moved outside of imm into TabletMemory?
firstWriteTimer = Timer.startNew();
}
int kv = nextKVCount.getAndAdd(numKVs);
try {
map.mutate(mutations, kv);
Expand All @@ -517,16 +510,6 @@ public void mutate(List<Mutation> mutations, int numKVs) {
}
}

public long getElapsedTimeSinceFirstWrite(TimeUnit unit) {
synchronized (writeSerializer) {
if (firstWriteTimer == null) {
return 0;
} else {
return firstWriteTimer.elapsed(unit);
}
}
}

/**
* Returns a long representing the size of the InMemoryMap
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_TABLET_MIGRATION_POOL;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -55,6 +56,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
Expand All @@ -78,6 +80,7 @@
import org.apache.accumulo.core.spi.scan.ScanPrioritizer;
import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.ServerContext;
Expand Down Expand Up @@ -587,9 +590,9 @@ private void manageMemory() {
}

public void updateMemoryUsageStats(Tablet tablet, long size, long lastCommitTime, long mincSize,
long elapsedMillisSinceFirstWrite) {
memUsageReports.add(new TabletMemoryReport(tablet, lastCommitTime, size, mincSize,
elapsedMillisSinceFirstWrite));
Duration elapsedSinceFirstWrite) {
memUsageReports.add(
new TabletMemoryReport(tablet, lastCommitTime, size, mincSize, elapsedSinceFirstWrite));
}

public void tabletClosed(KeyExtent extent) {
Expand Down Expand Up @@ -699,10 +702,10 @@ public synchronized ScanFileManager newScanFileManager(ScanDispatch scanDispatch

private final AtomicLong lastReportedSize = new AtomicLong();
private final AtomicLong lastReportedMincSize = new AtomicLong();
private final AtomicReference<Timer> firstReportedCommitTimer = new AtomicReference<>(null);
private volatile long lastReportedCommitTime = 0;

public void updateMemoryUsageStats(Tablet tablet, long size, long mincSize,
long elapsedMillisSinceFirstWrite) {
public void updateMemoryUsageStats(Tablet tablet, long size, long mincSize) {

// do not want to update stats for every little change,
// so only do it under certain circumstances... the reason
Expand All @@ -724,6 +727,17 @@ public void updateMemoryUsageStats(Tablet tablet, long size, long mincSize,
}

long currentTime = System.currentTimeMillis();

if (size == 0) {
// when a new in memory map is created this method is called with a size of zero so use that
// to reset the first write timer
firstReportedCommitTimer.set(null);
} else if (firstReportedCommitTimer.get() == null) {
// this is the first time a non zero size was seen for this in memory map so consider this
// the time of the first write
firstReportedCommitTimer.compareAndSet(null, Timer.startNew());
}

if ((delta > 32000 || delta < 0 || (currentTime - lastReportedCommitTime > 1000))
&& lastReportedSize.compareAndSet(lrs, totalSize)) {
if (delta > 0) {
Expand All @@ -733,8 +747,13 @@ public void updateMemoryUsageStats(Tablet tablet, long size, long mincSize,
}

if (report) {
Duration elapsedSinceFirstWrite = Duration.ZERO;
Timer localTimer = firstReportedCommitTimer.get();
if (localTimer != null) {
elapsedSinceFirstWrite = localTimer.elapsed();
}
memMgmt.updateMemoryUsageStats(tablet, size, lastReportedCommitTime, mincSize,
elapsedMillisSinceFirstWrite);
elapsedSinceFirstWrite);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class LargestFirstMemoryManager {
private double compactionThreshold;
private long maxObserved;
private final HashMap<TableId,Long> mincIdleThresholds = new HashMap<>();
private final HashMap<TableId,Long> mincAgeThresholds = new HashMap<>();
private ServerContext context = null;

private static class TabletInfo {
Expand Down Expand Up @@ -146,8 +147,9 @@ protected long getMinCIdleThreshold(KeyExtent extent) {
}

protected long getMaxAge(KeyExtent extent) {
return context.getTableConfiguration(extent.tableId())
.getTimeInMillis(Property.TABLE_MINC_COMPACT_MAXAGE);
TableId tableId = extent.tableId();
return mincAgeThresholds.computeIfAbsent(tableId, tid -> context.getTableConfiguration(tid)
.getTimeInMillis(Property.TABLE_MINC_COMPACT_MAXAGE));
}

protected boolean tableExists(TableId tableId) {
Expand Down Expand Up @@ -197,7 +199,7 @@ public List<KeyExtent> tabletsToMinorCompact(List<TabletMemoryReport> tablets) {
try {
// If the table was deleted, getMinCIdleThreshold will throw an exception
if (idleTime > getMinCIdleThreshold(tablet)
|| ts.getElapsedMillisSinceFirstWrite() > getMaxAge(tablet)) {
|| ts.getElapsedSinceFirstWrite().toMillis() > getMaxAge(tablet)) {
largestIdleMemTablets.put(timeMemoryLoad, tabletInfo);
}
} catch (IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.accumulo.tserver.memory;

import java.time.Duration;

import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.tserver.tablet.Tablet;

Expand All @@ -27,15 +29,15 @@ public class TabletMemoryReport implements Cloneable {
private final long lastCommitTime;
private final long memTableSize;
private final long minorCompactingMemTableSize;
private final long elapsedMillisSinceFirstWrite;
private final Duration elapsedSinceFirstWrite;

public TabletMemoryReport(Tablet tablet, long lastCommitTime, long memTableSize,
long minorCompactingMemTableSize, long elapsedMillisSinceFirstWrite) {
long minorCompactingMemTableSize, Duration elapsedSinceFirstWrite) {
this.tablet = tablet;
this.lastCommitTime = lastCommitTime;
this.memTableSize = memTableSize;
this.minorCompactingMemTableSize = minorCompactingMemTableSize;
this.elapsedMillisSinceFirstWrite = elapsedMillisSinceFirstWrite;
this.elapsedSinceFirstWrite = elapsedSinceFirstWrite;
}

public KeyExtent getExtent() {
Expand All @@ -50,8 +52,8 @@ public long getLastCommitTime() {
return lastCommitTime;
}

public long getElapsedMillisSinceFirstWrite() {
return elapsedMillisSinceFirstWrite;
public Duration getElapsedSinceFirstWrite() {
return elapsedSinceFirstWrite;
}

public long getMemTableSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2113,8 +2113,8 @@ public Durability getDurability() {
return DurabilityImpl.fromString(getTableConfiguration().get(Property.TABLE_DURABILITY));
}

public void updateMemoryUsageStats(long size, long mincSize, long elapsedMillisSinceFirstWrite) {
getTabletResources().updateMemoryUsageStats(this, size, mincSize, elapsedMillisSinceFirstWrite);
public void updateMemoryUsageStats(long size, long mincSize) {
getTabletResources().updateMemoryUsageStats(this, size, mincSize);
}

public long incrementDataSourceDeletions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
Expand Down Expand Up @@ -80,7 +79,7 @@ public CommitSession prepareForMinC() {
nextSeq += 2;

tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(),
otherMemTable.estimatedSizeInBytes(), 0);
otherMemTable.estimatedSizeInBytes());

return oldCommitSession;
}
Expand Down Expand Up @@ -123,8 +122,7 @@ public void finalizeMinC() {

deletingMemTable = null;

tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), 0,
memTable.getElapsedTimeSinceFirstWrite(TimeUnit.MILLISECONDS));
tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), 0);
}
}
}
Expand Down Expand Up @@ -155,8 +153,7 @@ public void updateMemoryUsageStats() {
other = deletingMemTable.estimatedSizeInBytes();
}

tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), other,
memTable.getElapsedTimeSinceFirstWrite(TimeUnit.MILLISECONDS));
tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), other);
}

public List<MemoryIterator> getIterators(SamplerConfigurationImpl samplerConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.easymock.EasyMock.replay;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.function.Predicate;
Expand Down Expand Up @@ -194,6 +195,25 @@ public void testDeletedTable() {
assertEquals(extent, tabletsToMinorCompact.get(0));
}

@Test
public void testMaxAge() {
LargestFirstMemoryManagerUnderTest mgr = new LargestFirstMemoryManagerUnderTest();
mgr.init(context);
List<KeyExtent> tabletsToMinorCompact;

// nothing to do
tabletsToMinorCompact =
mgr.tabletsToMinorCompact(tablets(t(k("x"), ZERO, 1000, 0), t(k("y"), ZERO, 2000, 0)));
assertEquals(0, tabletsToMinorCompact.size());

// a tablet that exceeds the configured max age should need to compact
tabletsToMinorCompact =
mgr.tabletsToMinorCompact(tablets(t(k("x"), ZERO, 1000, 0, Duration.ofMinutes(14)),
t(k("y"), ZERO, 2000, 0, Duration.ofMinutes(16))));
assertEquals(1, tabletsToMinorCompact.size());
assertEquals(k("y"), tabletsToMinorCompact.get(0));
}

private static class LargestFirstMemoryManagerUnderTest extends LargestFirstMemoryManager {

public long currentTime = ZERO;
Expand All @@ -208,6 +228,11 @@ protected long getMinCIdleThreshold(KeyExtent extent) {
return MINUTES.toMillis(15);
}

@Override
protected long getMaxAge(KeyExtent extent) {
return MINUTES.toMillis(15);
}

@Override
protected boolean tableExists(TableId tableId) {
return true;
Expand Down Expand Up @@ -246,9 +271,19 @@ private static KeyExtent k(String endRow) {
return new KeyExtent(TableId.of("1"), new Text(endRow), null);
}

// TODO test
private TabletMemoryReport t(KeyExtent ke, long lastCommit, long memSize, long compactingSize) {
return new TabletMemoryReport(null, lastCommit, memSize, compactingSize, 0) {
return new TabletMemoryReport(null, lastCommit, memSize, compactingSize, Duration.ZERO) {
@Override
public KeyExtent getExtent() {
return ke;
}
};
}

private TabletMemoryReport t(KeyExtent ke, long lastCommit, long memSize, long compactingSize,
Duration elapsedSinceFirstWrite) {
return new TabletMemoryReport(null, lastCommit, memSize, compactingSize,
elapsedSinceFirstWrite) {
@Override
public KeyExtent getExtent() {
return ke;
Expand Down
Loading

0 comments on commit f231eb4

Please sign in to comment.