Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-turner committed Aug 30, 2024
1 parent dcf951f commit 9c589cd
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,9 @@ public enum Property {
+ "in-memory map flushed to disk in a minor compaction. There is no guarantee an idle "
+ "tablet will be compacted.",
"1.3.5"),
TABLE_MINC_COMPACT_MAXAGE("table.compaction.minor.maxage", "5m", PropertyType.TIMEDURATION,
"The maximum amount of time new data can be buffered in memory before being flushed to a file. This is useful when using scan servers. TODO better description.",
"3.1.0"),
TABLE_COMPACTION_DISPATCHER("table.compaction.dispatcher",
SimpleCompactionDispatcher.class.getName(), PropertyType.CLASSNAME,
"A configurable dispatcher that decides what compaction service a table should use.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.PreAllocatedArray;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.commons.lang3.mutable.MutableLong;
Expand Down Expand Up @@ -488,6 +489,8 @@ public void mutate(List<Mutation> mutations, int kvCount) {

private final Object writeSerializer = new Object();

private Timer firstWriteTimer = null;

/**
* Applies changes to a row in the InMemoryMap
*
Expand All @@ -501,6 +504,10 @@ public void mutate(List<Mutation> mutations, int numKVs) {
//
// using separate lock from this map, to allow read/write in parallel
synchronized (writeSerializer) {
if (firstWriteTimer == null) {
// TODO can the be moved outside of imm into TabletMemory?
firstWriteTimer = Timer.startNew();
}
int kv = nextKVCount.getAndAdd(numKVs);
try {
map.mutate(mutations, kv);
Expand All @@ -510,6 +517,16 @@ public void mutate(List<Mutation> mutations, int numKVs) {
}
}

public long getElapsedTimeSinceFirstWrite(TimeUnit unit) {
synchronized (writeSerializer) {
if (firstWriteTimer == null) {
return 0;
} else {
return firstWriteTimer.elapsed(unit);
}
}
}

/**
* Returns a long representing the size of the InMemoryMap
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,9 +586,10 @@ private void manageMemory() {
}
}

public void updateMemoryUsageStats(Tablet tablet, long size, long lastCommitTime,
long mincSize) {
memUsageReports.add(new TabletMemoryReport(tablet, lastCommitTime, size, mincSize));
public void updateMemoryUsageStats(Tablet tablet, long size, long lastCommitTime, long mincSize,
long elapsedMillisSinceFirstWrite) {
memUsageReports.add(new TabletMemoryReport(tablet, lastCommitTime, size, mincSize,
elapsedMillisSinceFirstWrite));
}

public void tabletClosed(KeyExtent extent) {
Expand Down Expand Up @@ -700,7 +701,8 @@ public synchronized ScanFileManager newScanFileManager(ScanDispatch scanDispatch
private final AtomicLong lastReportedMincSize = new AtomicLong();
private volatile long lastReportedCommitTime = 0;

public void updateMemoryUsageStats(Tablet tablet, long size, long mincSize) {
public void updateMemoryUsageStats(Tablet tablet, long size, long mincSize,
long elapsedMillisSinceFirstWrite) {

// do not want to update stats for every little change,
// so only do it under certain circumstances... the reason
Expand Down Expand Up @@ -731,7 +733,8 @@ public void updateMemoryUsageStats(Tablet tablet, long size, long mincSize) {
}

if (report) {
memMgmt.updateMemoryUsageStats(tablet, size, lastReportedCommitTime, mincSize);
memMgmt.updateMemoryUsageStats(tablet, size, lastReportedCommitTime, mincSize,
elapsedMillisSinceFirstWrite);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ protected long getMinCIdleThreshold(KeyExtent extent) {
return mincIdleThresholds.get(tableId);
}

protected long getMaxAge(KeyExtent extent) {
return context.getTableConfiguration(extent.tableId())
.getTimeInMillis(Property.TABLE_MINC_COMPACT_MAXAGE);
}

protected boolean tableExists(TableId tableId) {
// make sure that the table still exists by checking if it has a configuration
return context.getTableConfiguration(tableId) != null;
Expand Down Expand Up @@ -191,7 +196,8 @@ public List<KeyExtent> tabletsToMinorCompact(List<TabletMemoryReport> tablets) {
TabletInfo tabletInfo = new TabletInfo(tablet, memTabletSize, idleTime, timeMemoryLoad);
try {
// If the table was deleted, getMinCIdleThreshold will throw an exception
if (idleTime > getMinCIdleThreshold(tablet)) {
if (idleTime > getMinCIdleThreshold(tablet)
|| ts.getElapsedMillisSinceFirstWrite() > getMaxAge(tablet)) {
largestIdleMemTablets.put(timeMemoryLoad, tabletInfo);
}
} catch (IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ public class TabletMemoryReport implements Cloneable {
private final long lastCommitTime;
private final long memTableSize;
private final long minorCompactingMemTableSize;
private final long elapsedMillisSinceFirstWrite;

public TabletMemoryReport(Tablet tablet, long lastCommitTime, long memTableSize,
long minorCompactingMemTableSize) {
long minorCompactingMemTableSize, long elapsedMillisSinceFirstWrite) {
this.tablet = tablet;
this.lastCommitTime = lastCommitTime;
this.memTableSize = memTableSize;
this.minorCompactingMemTableSize = minorCompactingMemTableSize;
this.elapsedMillisSinceFirstWrite = elapsedMillisSinceFirstWrite;
}

public KeyExtent getExtent() {
Expand All @@ -48,6 +50,10 @@ public long getLastCommitTime() {
return lastCommitTime;
}

public long getElapsedMillisSinceFirstWrite() {
return elapsedMillisSinceFirstWrite;
}

public long getMemTableSize() {
return memTableSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2113,8 +2113,8 @@ public Durability getDurability() {
return DurabilityImpl.fromString(getTableConfiguration().get(Property.TABLE_DURABILITY));
}

public void updateMemoryUsageStats(long size, long mincSize) {
getTabletResources().updateMemoryUsageStats(this, size, mincSize);
public void updateMemoryUsageStats(long size, long mincSize, long elapsedMillisSinceFirstWrite) {
getTabletResources().updateMemoryUsageStats(this, size, mincSize, elapsedMillisSinceFirstWrite);
}

public long incrementDataSourceDeletions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
Expand Down Expand Up @@ -79,7 +80,7 @@ public CommitSession prepareForMinC() {
nextSeq += 2;

tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(),
otherMemTable.estimatedSizeInBytes());
otherMemTable.estimatedSizeInBytes(), 0);

return oldCommitSession;
}
Expand Down Expand Up @@ -122,7 +123,8 @@ public void finalizeMinC() {

deletingMemTable = null;

tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), 0);
tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), 0,
memTable.getElapsedTimeSinceFirstWrite(TimeUnit.MILLISECONDS));
}
}
}
Expand Down Expand Up @@ -153,7 +155,8 @@ public void updateMemoryUsageStats() {
other = deletingMemTable.estimatedSizeInBytes();
}

tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), other);
tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), other,
memTable.getElapsedTimeSinceFirstWrite(TimeUnit.MILLISECONDS));
}

public List<MemoryIterator> getIterators(SamplerConfigurationImpl samplerConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,9 @@ private static KeyExtent k(String endRow) {
return new KeyExtent(TableId.of("1"), new Text(endRow), null);
}

// TODO test
private TabletMemoryReport t(KeyExtent ke, long lastCommit, long memSize, long compactingSize) {
return new TabletMemoryReport(null, lastCommit, memSize, compactingSize) {
return new TabletMemoryReport(null, lastCommit, memSize, compactingSize, 0) {
@Override
public KeyExtent getExtent() {
return ke;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test;

import java.security.SecureRandom;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Test;

public class ScanServerMaxLatencyIT extends ConfigurableMacBase {

protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setProperty(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION, "2s");
}

@Test
public void testMaxLatency() throws Exception {
final String[] tables = this.getUniqueNames(2);
final String table1 = tables[0];
final String table2 = tables[1];

getCluster().getConfig().setNumScanServers(1);
getCluster().getClusterControl().startAllServers(ServerType.SCAN_SERVER);

ExecutorService executor = Executors.newCachedThreadPool();
try (var client = Accumulo.newClient().from(getClientProperties()).build()) {

Wait.waitFor(() -> !client.instanceOperations().getScanServers().isEmpty());

var ntc = new NewTableConfiguration();
ntc.setProperties(Map.of(Property.TABLE_MINC_COMPACT_MAXAGE.getKey(), "2s"));
client.tableOperations().create(table1, ntc);
client.tableOperations().create(table2);

Timer timer = Timer.startNew();

var future1 = executor.submit(createWriterTask(client, table1, timer));
var future2 = executor.submit(createWriterTask(client, table2, timer));

for (int i = 0; i < 20; i++) {
Thread.sleep(1000);
var eb = timer.elapsed(TimeUnit.MILLISECONDS);
var es = readMaxElapsed(client, table1);
System.out.println("t1 " + es + " " + (eb - es));
System.out.println("t2 " + readMaxElapsed(client, table2));
}
}

}

private long readMaxElapsed(AccumuloClient client, String table) throws Exception {
try (var scanner = client.createScanner(table)) {
scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
scanner.fetchColumn(new Text("elapsed"), new Text("nanos"));
return scanner.stream().mapToLong(e -> Long.parseLong(e.getValue().toString(), 10)).max()
.orElse(-1);
}
}

private static Callable<Void> createWriterTask(AccumuloClient client, String table, Timer timer) {
SecureRandom random = new SecureRandom();
Callable<Void> writerTask = () -> {
try (var writer = client.createBatchWriter(table)) {
while (true) {
var elapsed = timer.elapsed(TimeUnit.MILLISECONDS);
Mutation m = new Mutation(Long.toHexString(random.nextLong()));
m.put("elapsed", "nanos", "" + elapsed);
writer.addMutation(m);
writer.flush();
Thread.sleep(100);
}
}
};
return writerTask;
}

}

0 comments on commit 9c589cd

Please sign in to comment.