From e228b681d1bf8a984848be3d21bb374e996803c9 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Thu, 26 Sep 2024 00:10:55 +0000 Subject: [PATCH] adds ability to constuct load plan while writing to rfile --- assemble/pom.xml | 2 +- core/pom.xml | 2 +- .../core/client/rfile/LoadPlanCollector.java | 124 ++++++++++++++++++ .../accumulo/core/client/rfile/RFile.java | 9 ++ .../core/client/rfile/RFileWriter.java | 29 +++- .../core/client/rfile/RFileWriterBuilder.java | 25 +++- .../apache/accumulo/core/data/LoadPlan.java | 32 ++++- .../accumulo/core/file/rfile/RFileTest.java | 1 - hadoop-mapreduce/pom.xml | 2 +- iterator-test-harness/pom.xml | 2 +- minicluster/pom.xml | 2 +- pom.xml | 2 +- server/base/pom.xml | 2 +- server/compaction-coordinator/pom.xml | 2 +- server/compactor/pom.xml | 2 +- server/gc/pom.xml | 2 +- server/manager/pom.xml | 2 +- server/master/pom.xml | 2 +- server/monitor/pom.xml | 2 +- server/native/pom.xml | 2 +- server/tserver/pom.xml | 2 +- shell/pom.xml | 2 +- start/pom.xml | 2 +- test/pom.xml | 2 +- 24 files changed, 229 insertions(+), 27 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java diff --git a/assemble/pom.xml b/assemble/pom.xml index 2d933e582fc..e256cf5ba78 100644 --- a/assemble/pom.xml +++ b/assemble/pom.xml @@ -24,7 +24,7 @@ org.apache.accumulo accumulo-project - 2.1.4-SNAPSHOT + 2.1.4-4898 accumulo pom diff --git a/core/pom.xml b/core/pom.xml index 327a02ea42c..df4b24185e8 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -24,7 +24,7 @@ org.apache.accumulo accumulo-project - 2.1.4-SNAPSHOT + 2.1.4-4898 accumulo-core Apache Accumulo Core diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java new file mode 100644 index 00000000000..dfc073add84 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.client.rfile; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Preconditions; + +class LoadPlanCollector { + + private final LoadPlan.SplitResolver splitResolver; + private boolean finished = false; + private Text lgFirstRow; + private Text lgLastRow; + private Text firstRow; + private Text lastRow; + private Set overlappingExtents; + private KeyExtent currentExtent; + + LoadPlanCollector(LoadPlan.SplitResolver splitResolver) { + this.splitResolver = splitResolver; + this.overlappingExtents = new HashSet<>(); + } + + LoadPlanCollector() { + splitResolver = null; + this.overlappingExtents = null; + + } + + private void appendNoSplits(Key key) { + if (lgFirstRow == null) { + lgFirstRow = key.getRow(); + lgLastRow = lgFirstRow; + } else { + var row = key.getRow(); + lgLastRow = row; + } + } + + private static final TableId FAKE_ID = TableId.of("123"); + + private void appendSplits(Key key) { + var row = key.getRow(); + if (currentExtent == null || !currentExtent.contains(row)) { + var tableSplits = splitResolver.apply(row); + var extent = new KeyExtent(FAKE_ID, tableSplits.getEndRow(), tableSplits.getPrevRow()); + if (currentExtent != null) { + // TODO validate that row is after the currentExtent + overlappingExtents.add(currentExtent); + } + currentExtent = extent; + } + } + + public void append(Key key) { + if (splitResolver == null) { + appendNoSplits(key); + } else { + appendSplits(key); + } + } + + public void startLocalityGroup() { + if (lgFirstRow != null) { + if (firstRow == null) { + firstRow = lgFirstRow; + lastRow = lgLastRow; + } else { + // take the minimum + firstRow = firstRow.compareTo(lgFirstRow) < 0 ? firstRow : lgFirstRow; + // take the maximum + lastRow = lastRow.compareTo(lgLastRow) > 0 ? lastRow : lgLastRow; + } + lgFirstRow = null; + lgLastRow = null; + } + } + + public LoadPlan getLoadPlan(String filename) { + Preconditions.checkState(finished); + if (splitResolver == null) { + return LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, firstRow, lastRow) + .build(); + } else { + var builder = LoadPlan.builder(); + overlappingExtents.add(currentExtent); + for (var extent : overlappingExtents) { + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, extent.prevEndRow(), + extent.endRow()); + } + return builder.build(); + } + } + + public void close() { + finished = true; + // compute the overall min and max rows + startLocalityGroup(); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java index 3b6d10aade1..04b7ff4a772 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java @@ -34,6 +34,7 @@ import org.apache.accumulo.core.client.summary.Summary.FileStatistics; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.security.Authorizations; import org.apache.hadoop.fs.FileSystem; @@ -428,6 +429,14 @@ default WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf) */ WriterOptions withVisibilityCacheSize(int maxSize); + /** + * @param splitResolver builds a load plan using table split points provided by the given + * splitResolver. + * @return this + * @see RFileWriter#getLoadPlan(String) + */ + WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver); + /** * @return a new RfileWriter created with the options previously specified. */ diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java index b4d6def4a23..c40bb0894e5 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java @@ -29,6 +29,7 @@ import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.security.ColumnVisibility; @@ -92,12 +93,17 @@ public class RFileWriter implements AutoCloseable { private final FileSKVWriter writer; private final LRUMap validVisibilities; + + // TODO should be able to completely remove this as lower level code is already doing some things + // like tracking first and last keys per LG. Added to get simple initial impl before optimizing. + private final LoadPlanCollector loadPlanCollector; private boolean startedLG; private boolean startedDefaultLG; - RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize) { + RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize, LoadPlanCollector loadPlanCollector) { this.writer = fileSKVWriter; this.validVisibilities = new LRUMap<>(visCacheSize); + this.loadPlanCollector = loadPlanCollector; } private void _startNewLocalityGroup(String name, Set columnFamilies) @@ -106,6 +112,7 @@ private void _startNewLocalityGroup(String name, Set columnFamilie "Cannot start a locality group after starting the default locality group"); writer.startNewLocalityGroup(name, columnFamilies); startedLG = true; + loadPlanCollector.startLocalityGroup(); } /** @@ -175,6 +182,7 @@ public void startNewLocalityGroup(String name, String... families) throws IOExce public void startDefaultLocalityGroup() throws IOException { Preconditions.checkState(!startedDefaultLG); + loadPlanCollector.startLocalityGroup(); writer.startDefaultLocalityGroup(); startedDefaultLG = true; startedLG = true; @@ -204,6 +212,7 @@ public void append(Key key, Value val) throws IOException { validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE); } writer.append(key, val); + loadPlanCollector.append(key); } /** @@ -250,5 +259,23 @@ public void append(Iterable> keyValues) throws IOException { @Override public void close() throws IOException { writer.close(); + loadPlanCollector.close(); + } + + /** + * If no split resolver was provided when the RFileWriter was built then this method will return a + * simple load plan of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#FILE} using + * the first and last row seen. If a splitResolver was provided then this will return a load plan + * of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#TABLE} that has the split + * ranges the rows written overlapped. + * + * @param filename + * @return load plan computed from the keys written to the rfile. + * @see org.apache.accumulo.core.client.rfile.RFile.WriterOptions#withSplitResolver(LoadPlan.SplitResolver) + * TODO since tags on all new apis + */ + // TODO test case of empty rfile... and just test this in general + public LoadPlan getLoadPlan(String filename) { + return loadPlanCollector.getLoadPlan(filename); } } diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java index be1850c8c32..ab99160c240 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java @@ -37,6 +37,7 @@ import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.crypto.CryptoFactoryLoader; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.metadata.ValidationUtil; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; @@ -72,6 +73,7 @@ OutputStream getOutputStream() { private int visCacheSize = 1000; private Map samplerProps = Collections.emptyMap(); private Map summarizerProps = Collections.emptyMap(); + private LoadPlan.SplitResolver splitReolver; private void checkDisjoint(Map props, Map derivedProps, String kind) { @@ -106,6 +108,13 @@ public RFileWriter build() throws IOException { CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, tableConfig); + LoadPlanCollector loadPlanCollector; + if (splitReolver != null) { + loadPlanCollector = new LoadPlanCollector(splitReolver); + } else { + loadPlanCollector = new LoadPlanCollector(); + } + if (out.getOutputStream() != null) { FSDataOutputStream fsdo; if (out.getOutputStream() instanceof FSDataOutputStream) { @@ -116,11 +125,13 @@ public RFileWriter build() throws IOException { return new RFileWriter( fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf(), cs) .withTableConfiguration(acuconf).withStartDisabled().build(), - visCacheSize); + visCacheSize, loadPlanCollector); } else { - return new RFileWriter(fileops.newWriterBuilder() - .forFile(out.path.toString(), out.getFileSystem(), out.getConf(), cs) - .withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize); + return new RFileWriter( + fileops.newWriterBuilder() + .forFile(out.path.toString(), out.getFileSystem(), out.getConf(), cs) + .withTableConfiguration(acuconf).withStartDisabled().build(), + visCacheSize, loadPlanCollector); } } @@ -172,6 +183,12 @@ public WriterOptions withVisibilityCacheSize(int maxSize) { return this; } + @Override + public WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver) { + this.splitReolver = splitResolver; + return this; + } + @Override public WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf) { Objects.requireNonNull(summarizerConf); diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java index 370cb4a4ec6..8abc7a75067 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java +++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java @@ -28,6 +28,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.SortedSet; import java.util.function.Function; import java.util.stream.Collectors; @@ -298,14 +299,39 @@ public static LoadPlan fromJson(String json) { * Represents two split points that exist in a table being bulk imported to. */ public static class TableSplits { - public final Text prevRow; - public final Text endRow; + private final Text prevRow; + private final Text endRow; public TableSplits(Text prevRow, Text endRow) { - // TODO check order, expect that prevRow < endRow + Preconditions.checkArgument( + prevRow == null || endRow == null || prevRow.compareTo(endRow) < 0, "%s >= %s", prevRow, + endRow); this.prevRow = prevRow; this.endRow = endRow; } + + public Text getPrevRow() { + return prevRow; + } + + public Text getEndRow() { + return endRow; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + TableSplits that = (TableSplits) o; + return Objects.equals(prevRow, that.prevRow) && Objects.equals(endRow, that.endRow); + } + + @Override + public int hashCode() { + return Objects.hash(prevRow, endRow); + } } /** diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index 082e0d6ea7b..876d12a15d6 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -366,7 +366,6 @@ static String formatString(String prefix, int i) { public void test1() throws IOException { // test an empty file - TestRFile trf = new TestRFile(conf); trf.openWriter(); diff --git a/hadoop-mapreduce/pom.xml b/hadoop-mapreduce/pom.xml index bca66f8f624..21b9deee049 100644 --- a/hadoop-mapreduce/pom.xml +++ b/hadoop-mapreduce/pom.xml @@ -24,7 +24,7 @@ org.apache.accumulo accumulo-project - 2.1.4-SNAPSHOT + 2.1.4-4898 accumulo-hadoop-mapreduce Apache Accumulo Hadoop MapReduce diff --git a/iterator-test-harness/pom.xml b/iterator-test-harness/pom.xml index f5ad91ebe0c..ffc818df810 100644 --- a/iterator-test-harness/pom.xml +++ b/iterator-test-harness/pom.xml @@ -24,7 +24,7 @@ org.apache.accumulo accumulo-project - 2.1.4-SNAPSHOT + 2.1.4-4898 accumulo-iterator-test-harness Apache Accumulo Iterator Test Harness diff --git a/minicluster/pom.xml b/minicluster/pom.xml index 4a304429b0b..6672c9fbcde 100644 --- a/minicluster/pom.xml +++ b/minicluster/pom.xml @@ -24,7 +24,7 @@ org.apache.accumulo accumulo-project - 2.1.4-SNAPSHOT + 2.1.4-4898 accumulo-minicluster Apache Accumulo MiniCluster diff --git a/pom.xml b/pom.xml index 1faae076330..7d55e875acc 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,7 @@ org.apache.accumulo accumulo-project - 2.1.4-SNAPSHOT + 2.1.4-4898 pom Apache Accumulo Project Apache Accumulo is a sorted, distributed key/value store based diff --git a/server/base/pom.xml b/server/base/pom.xml index 882b3634d81..3d796b3bf94 100644 --- a/server/base/pom.xml +++ b/server/base/pom.xml @@ -24,7 +24,7 @@ org.apache.accumulo accumulo-project - 2.1.4-SNAPSHOT + 2.1.4-4898 ../../pom.xml accumulo-server-base diff --git a/server/compaction-coordinator/pom.xml b/server/compaction-coordinator/pom.xml index 18c8699af10..909b46ada55 100644 --- a/server/compaction-coordinator/pom.xml +++ b/server/compaction-coordinator/pom.xml @@ -24,7 +24,7 @@ org.apache.accumulo accumulo-project - 2.1.4-SNAPSHOT + 2.1.4-4898 ../../pom.xml accumulo-compaction-coordinator diff --git a/server/compactor/pom.xml b/server/compactor/pom.xml index dc81eb351c7..d56dd853a85 100644 --- a/server/compactor/pom.xml +++ b/server/compactor/pom.xml @@ -24,7 +24,7 @@ org.apache.accumulo accumulo-project - 2.1.4-SNAPSHOT + 2.1.4-4898 ../../pom.xml accumulo-compactor diff --git a/server/gc/pom.xml b/server/gc/pom.xml index 00a091dab0a..92c56e3fd3b 100644 --- a/server/gc/pom.xml +++ b/server/gc/pom.xml @@ -24,7 +24,7 @@ org.apache.accumulo accumulo-project - 2.1.4-SNAPSHOT + 2.1.4-4898 ../../pom.xml accumulo-gc diff --git a/server/manager/pom.xml b/server/manager/pom.xml index 20da2677cc9..09410e99d90 100644 --- a/server/manager/pom.xml +++ b/server/manager/pom.xml @@ -24,7 +24,7 @@ org.apache.accumulo accumulo-project - 2.1.4-SNAPSHOT + 2.1.4-4898 ../../pom.xml accumulo-manager diff --git a/server/master/pom.xml b/server/master/pom.xml index ae3054fb744..266ecd7e61c 100644 --- a/server/master/pom.xml +++ b/server/master/pom.xml @@ -24,7 +24,7 @@ org.apache.accumulo accumulo-project - 2.1.4-SNAPSHOT + 2.1.4-4898 ../../pom.xml diff --git a/server/monitor/pom.xml b/server/monitor/pom.xml index cb2e899597a..82ed42bfaeb 100644 --- a/server/monitor/pom.xml +++ b/server/monitor/pom.xml @@ -24,7 +24,7 @@ org.apache.accumulo accumulo-project - 2.1.4-SNAPSHOT + 2.1.4-4898 ../../pom.xml accumulo-monitor diff --git a/server/native/pom.xml b/server/native/pom.xml index 1becc630211..1d67875fa9e 100644 --- a/server/native/pom.xml +++ b/server/native/pom.xml @@ -24,7 +24,7 @@ org.apache.accumulo accumulo-project - 2.1.4-SNAPSHOT + 2.1.4-4898 ../../pom.xml accumulo-native diff --git a/server/tserver/pom.xml b/server/tserver/pom.xml index 5c850562a42..ac4fca84997 100644 --- a/server/tserver/pom.xml +++ b/server/tserver/pom.xml @@ -24,7 +24,7 @@ org.apache.accumulo accumulo-project - 2.1.4-SNAPSHOT + 2.1.4-4898 ../../pom.xml accumulo-tserver diff --git a/shell/pom.xml b/shell/pom.xml index 6cd66a82973..52a3884ce8d 100644 --- a/shell/pom.xml +++ b/shell/pom.xml @@ -24,7 +24,7 @@ org.apache.accumulo accumulo-project - 2.1.4-SNAPSHOT + 2.1.4-4898 accumulo-shell Apache Accumulo Shell diff --git a/start/pom.xml b/start/pom.xml index 9b207bd6b16..3559ed5a56d 100644 --- a/start/pom.xml +++ b/start/pom.xml @@ -24,7 +24,7 @@ org.apache.accumulo accumulo-project - 2.1.4-SNAPSHOT + 2.1.4-4898 accumulo-start Apache Accumulo Start diff --git a/test/pom.xml b/test/pom.xml index ec590dca0ab..8478b491146 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -24,7 +24,7 @@ org.apache.accumulo accumulo-project - 2.1.4-SNAPSHOT + 2.1.4-4898 accumulo-test Apache Accumulo Testing