From 9c589cd02520e801394adf24d763bf1401e9e287 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 30 Aug 2024 23:39:51 +0000 Subject: [PATCH] WIP --- .../apache/accumulo/core/conf/Property.java | 3 + .../apache/accumulo/tserver/InMemoryMap.java | 17 +++ .../tserver/TabletServerResourceManager.java | 13 ++- .../memory/LargestFirstMemoryManager.java | 8 +- .../tserver/memory/TabletMemoryReport.java | 8 +- .../accumulo/tserver/tablet/Tablet.java | 4 +- .../accumulo/tserver/tablet/TabletMemory.java | 9 +- .../memory/LargestFirstMemoryManagerTest.java | 3 +- .../accumulo/test/ScanServerMaxLatencyIT.java | 110 ++++++++++++++++++ 9 files changed, 162 insertions(+), 13 deletions(-) create mode 100644 test/src/main/java/org/apache/accumulo/test/ScanServerMaxLatencyIT.java diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index c69971e3d72..b4c057ecab3 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -912,6 +912,9 @@ public enum Property { + "in-memory map flushed to disk in a minor compaction. There is no guarantee an idle " + "tablet will be compacted.", "1.3.5"), + TABLE_MINC_COMPACT_MAXAGE("table.compaction.minor.maxage", "5m", PropertyType.TIMEDURATION, + "The maximum amount of time new data can be buffered in memory before being flushed to a file. This is useful when using scan servers. TODO better description.", + "3.1.0"), TABLE_COMPACTION_DISPATCHER("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(), PropertyType.CLASSNAME, "A configurable dispatcher that decides what compaction service a table should use.", diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java index d943e3f6f85..7dc2066fbfc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java @@ -70,6 +70,7 @@ import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.PreAllocatedArray; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.commons.lang3.mutable.MutableLong; @@ -488,6 +489,8 @@ public void mutate(List mutations, int kvCount) { private final Object writeSerializer = new Object(); + private Timer firstWriteTimer = null; + /** * Applies changes to a row in the InMemoryMap * @@ -501,6 +504,10 @@ public void mutate(List mutations, int numKVs) { // // using separate lock from this map, to allow read/write in parallel synchronized (writeSerializer) { + if (firstWriteTimer == null) { + // TODO can the be moved outside of imm into TabletMemory? + firstWriteTimer = Timer.startNew(); + } int kv = nextKVCount.getAndAdd(numKVs); try { map.mutate(mutations, kv); @@ -510,6 +517,16 @@ public void mutate(List mutations, int numKVs) { } } + public long getElapsedTimeSinceFirstWrite(TimeUnit unit) { + synchronized (writeSerializer) { + if (firstWriteTimer == null) { + return 0; + } else { + return firstWriteTimer.elapsed(unit); + } + } + } + /** * Returns a long representing the size of the InMemoryMap * diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index eac4e3064b3..de0f323956f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -586,9 +586,10 @@ private void manageMemory() { } } - public void updateMemoryUsageStats(Tablet tablet, long size, long lastCommitTime, - long mincSize) { - memUsageReports.add(new TabletMemoryReport(tablet, lastCommitTime, size, mincSize)); + public void updateMemoryUsageStats(Tablet tablet, long size, long lastCommitTime, long mincSize, + long elapsedMillisSinceFirstWrite) { + memUsageReports.add(new TabletMemoryReport(tablet, lastCommitTime, size, mincSize, + elapsedMillisSinceFirstWrite)); } public void tabletClosed(KeyExtent extent) { @@ -700,7 +701,8 @@ public synchronized ScanFileManager newScanFileManager(ScanDispatch scanDispatch private final AtomicLong lastReportedMincSize = new AtomicLong(); private volatile long lastReportedCommitTime = 0; - public void updateMemoryUsageStats(Tablet tablet, long size, long mincSize) { + public void updateMemoryUsageStats(Tablet tablet, long size, long mincSize, + long elapsedMillisSinceFirstWrite) { // do not want to update stats for every little change, // so only do it under certain circumstances... the reason @@ -731,7 +733,8 @@ public void updateMemoryUsageStats(Tablet tablet, long size, long mincSize) { } if (report) { - memMgmt.updateMemoryUsageStats(tablet, size, lastReportedCommitTime, mincSize); + memMgmt.updateMemoryUsageStats(tablet, size, lastReportedCommitTime, mincSize, + elapsedMillisSinceFirstWrite); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManager.java index 40a85110fe0..5a58d378fe0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManager.java @@ -145,6 +145,11 @@ protected long getMinCIdleThreshold(KeyExtent extent) { return mincIdleThresholds.get(tableId); } + protected long getMaxAge(KeyExtent extent) { + return context.getTableConfiguration(extent.tableId()) + .getTimeInMillis(Property.TABLE_MINC_COMPACT_MAXAGE); + } + protected boolean tableExists(TableId tableId) { // make sure that the table still exists by checking if it has a configuration return context.getTableConfiguration(tableId) != null; @@ -191,7 +196,8 @@ public List tabletsToMinorCompact(List tablets) { TabletInfo tabletInfo = new TabletInfo(tablet, memTabletSize, idleTime, timeMemoryLoad); try { // If the table was deleted, getMinCIdleThreshold will throw an exception - if (idleTime > getMinCIdleThreshold(tablet)) { + if (idleTime > getMinCIdleThreshold(tablet) + || ts.getElapsedMillisSinceFirstWrite() > getMaxAge(tablet)) { largestIdleMemTablets.put(timeMemoryLoad, tabletInfo); } } catch (IllegalArgumentException e) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/TabletMemoryReport.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/TabletMemoryReport.java index 4d4ff47cc25..6b2f8a9d165 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/TabletMemoryReport.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/TabletMemoryReport.java @@ -27,13 +27,15 @@ public class TabletMemoryReport implements Cloneable { private final long lastCommitTime; private final long memTableSize; private final long minorCompactingMemTableSize; + private final long elapsedMillisSinceFirstWrite; public TabletMemoryReport(Tablet tablet, long lastCommitTime, long memTableSize, - long minorCompactingMemTableSize) { + long minorCompactingMemTableSize, long elapsedMillisSinceFirstWrite) { this.tablet = tablet; this.lastCommitTime = lastCommitTime; this.memTableSize = memTableSize; this.minorCompactingMemTableSize = minorCompactingMemTableSize; + this.elapsedMillisSinceFirstWrite = elapsedMillisSinceFirstWrite; } public KeyExtent getExtent() { @@ -48,6 +50,10 @@ public long getLastCommitTime() { return lastCommitTime; } + public long getElapsedMillisSinceFirstWrite() { + return elapsedMillisSinceFirstWrite; + } + public long getMemTableSize() { return memTableSize; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 0193b46c93e..134bf601d72 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -2113,8 +2113,8 @@ public Durability getDurability() { return DurabilityImpl.fromString(getTableConfiguration().get(Property.TABLE_DURABILITY)); } - public void updateMemoryUsageStats(long size, long mincSize) { - getTabletResources().updateMemoryUsageStats(this, size, mincSize); + public void updateMemoryUsageStats(long size, long mincSize, long elapsedMillisSinceFirstWrite) { + getTabletResources().updateMemoryUsageStats(this, size, mincSize, elapsedMillisSinceFirstWrite); } public long incrementDataSourceDeletions() { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java index f5b81ff8a79..75f0dee6716 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; @@ -79,7 +80,7 @@ public CommitSession prepareForMinC() { nextSeq += 2; tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), - otherMemTable.estimatedSizeInBytes()); + otherMemTable.estimatedSizeInBytes(), 0); return oldCommitSession; } @@ -122,7 +123,8 @@ public void finalizeMinC() { deletingMemTable = null; - tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), 0); + tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), 0, + memTable.getElapsedTimeSinceFirstWrite(TimeUnit.MILLISECONDS)); } } } @@ -153,7 +155,8 @@ public void updateMemoryUsageStats() { other = deletingMemTable.estimatedSizeInBytes(); } - tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), other); + tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), other, + memTable.getElapsedTimeSinceFirstWrite(TimeUnit.MILLISECONDS)); } public List getIterators(SamplerConfigurationImpl samplerConfig) { diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManagerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManagerTest.java index db5d83cb0b0..e95cf064200 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManagerTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManagerTest.java @@ -246,8 +246,9 @@ private static KeyExtent k(String endRow) { return new KeyExtent(TableId.of("1"), new Text(endRow), null); } + // TODO test private TabletMemoryReport t(KeyExtent ke, long lastCommit, long memSize, long compactingSize) { - return new TabletMemoryReport(null, lastCommit, memSize, compactingSize) { + return new TabletMemoryReport(null, lastCommit, memSize, compactingSize, 0) { @Override public KeyExtent getExtent() { return ke; diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerMaxLatencyIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerMaxLatencyIT.java new file mode 100644 index 00000000000..47f9ed68c13 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerMaxLatencyIT.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import java.security.SecureRandom; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.accumulo.test.util.Wait; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; + +public class ScanServerMaxLatencyIT extends ConfigurableMacBase { + + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION, "2s"); + } + + @Test + public void testMaxLatency() throws Exception { + final String[] tables = this.getUniqueNames(2); + final String table1 = tables[0]; + final String table2 = tables[1]; + + getCluster().getConfig().setNumScanServers(1); + getCluster().getClusterControl().startAllServers(ServerType.SCAN_SERVER); + + ExecutorService executor = Executors.newCachedThreadPool(); + try (var client = Accumulo.newClient().from(getClientProperties()).build()) { + + Wait.waitFor(() -> !client.instanceOperations().getScanServers().isEmpty()); + + var ntc = new NewTableConfiguration(); + ntc.setProperties(Map.of(Property.TABLE_MINC_COMPACT_MAXAGE.getKey(), "2s")); + client.tableOperations().create(table1, ntc); + client.tableOperations().create(table2); + + Timer timer = Timer.startNew(); + + var future1 = executor.submit(createWriterTask(client, table1, timer)); + var future2 = executor.submit(createWriterTask(client, table2, timer)); + + for (int i = 0; i < 20; i++) { + Thread.sleep(1000); + var eb = timer.elapsed(TimeUnit.MILLISECONDS); + var es = readMaxElapsed(client, table1); + System.out.println("t1 " + es + " " + (eb - es)); + System.out.println("t2 " + readMaxElapsed(client, table2)); + } + } + + } + + private long readMaxElapsed(AccumuloClient client, String table) throws Exception { + try (var scanner = client.createScanner(table)) { + scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL); + scanner.fetchColumn(new Text("elapsed"), new Text("nanos")); + return scanner.stream().mapToLong(e -> Long.parseLong(e.getValue().toString(), 10)).max() + .orElse(-1); + } + } + + private static Callable createWriterTask(AccumuloClient client, String table, Timer timer) { + SecureRandom random = new SecureRandom(); + Callable writerTask = () -> { + try (var writer = client.createBatchWriter(table)) { + while (true) { + var elapsed = timer.elapsed(TimeUnit.MILLISECONDS); + Mutation m = new Mutation(Long.toHexString(random.nextLong())); + m.put("elapsed", "nanos", "" + elapsed); + writer.addMutation(m); + writer.flush(); + Thread.sleep(100); + } + } + }; + return writerTask; + } + +}