Skip to content

Commit

Permalink
adds ability to constuct load plan while writing to rfile
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-turner committed Sep 26, 2024
1 parent 368b2a4 commit e228b68
Show file tree
Hide file tree
Showing 24 changed files with 229 additions and 27 deletions.
2 changes: 1 addition & 1 deletion assemble/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-project</artifactId>
<version>2.1.4-SNAPSHOT</version>
<version>2.1.4-4898</version>
</parent>
<artifactId>accumulo</artifactId>
<packaging>pom</packaging>
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-project</artifactId>
<version>2.1.4-SNAPSHOT</version>
<version>2.1.4-4898</version>
</parent>
<artifactId>accumulo-core</artifactId>
<name>Apache Accumulo Core</name>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.client.rfile;

import java.util.HashSet;
import java.util.Set;

import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.hadoop.io.Text;

import com.google.common.base.Preconditions;

class LoadPlanCollector {

private final LoadPlan.SplitResolver splitResolver;
private boolean finished = false;
private Text lgFirstRow;
private Text lgLastRow;
private Text firstRow;
private Text lastRow;
private Set<KeyExtent> overlappingExtents;
private KeyExtent currentExtent;

LoadPlanCollector(LoadPlan.SplitResolver splitResolver) {
this.splitResolver = splitResolver;
this.overlappingExtents = new HashSet<>();
}

LoadPlanCollector() {
splitResolver = null;
this.overlappingExtents = null;

}

private void appendNoSplits(Key key) {
if (lgFirstRow == null) {
lgFirstRow = key.getRow();
lgLastRow = lgFirstRow;
} else {
var row = key.getRow();
lgLastRow = row;
}
}

private static final TableId FAKE_ID = TableId.of("123");

private void appendSplits(Key key) {
var row = key.getRow();
if (currentExtent == null || !currentExtent.contains(row)) {
var tableSplits = splitResolver.apply(row);
var extent = new KeyExtent(FAKE_ID, tableSplits.getEndRow(), tableSplits.getPrevRow());
if (currentExtent != null) {
// TODO validate that row is after the currentExtent
overlappingExtents.add(currentExtent);
}
currentExtent = extent;
}
}

public void append(Key key) {
if (splitResolver == null) {
appendNoSplits(key);
} else {
appendSplits(key);
}
}

public void startLocalityGroup() {
if (lgFirstRow != null) {
if (firstRow == null) {
firstRow = lgFirstRow;
lastRow = lgLastRow;
} else {
// take the minimum
firstRow = firstRow.compareTo(lgFirstRow) < 0 ? firstRow : lgFirstRow;
// take the maximum
lastRow = lastRow.compareTo(lgLastRow) > 0 ? lastRow : lgLastRow;
}
lgFirstRow = null;
lgLastRow = null;
}
}

public LoadPlan getLoadPlan(String filename) {
Preconditions.checkState(finished);
if (splitResolver == null) {
return LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, firstRow, lastRow)
.build();
} else {
var builder = LoadPlan.builder();
overlappingExtents.add(currentExtent);
for (var extent : overlappingExtents) {
builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, extent.prevEndRow(),
extent.endRow());
}
return builder.build();
}
}

public void close() {
finished = true;
// compute the overall min and max rows
startLocalityGroup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.accumulo.core.client.summary.Summary.FileStatistics;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -428,6 +429,14 @@ default WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf)
*/
WriterOptions withVisibilityCacheSize(int maxSize);

/**
* @param splitResolver builds a load plan using table split points provided by the given
* splitResolver.
* @return this
* @see RFileWriter#getLoadPlan(String)
*/
WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver);

/**
* @return a new RfileWriter created with the options previously specified.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.security.ColumnVisibility;
Expand Down Expand Up @@ -92,12 +93,17 @@ public class RFileWriter implements AutoCloseable {

private final FileSKVWriter writer;
private final LRUMap<ByteSequence,Boolean> validVisibilities;

// TODO should be able to completely remove this as lower level code is already doing some things
// like tracking first and last keys per LG. Added to get simple initial impl before optimizing.
private final LoadPlanCollector loadPlanCollector;
private boolean startedLG;
private boolean startedDefaultLG;

RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize) {
RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize, LoadPlanCollector loadPlanCollector) {
this.writer = fileSKVWriter;
this.validVisibilities = new LRUMap<>(visCacheSize);
this.loadPlanCollector = loadPlanCollector;
}

private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies)
Expand All @@ -106,6 +112,7 @@ private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilie
"Cannot start a locality group after starting the default locality group");
writer.startNewLocalityGroup(name, columnFamilies);
startedLG = true;
loadPlanCollector.startLocalityGroup();
}

/**
Expand Down Expand Up @@ -175,6 +182,7 @@ public void startNewLocalityGroup(String name, String... families) throws IOExce

public void startDefaultLocalityGroup() throws IOException {
Preconditions.checkState(!startedDefaultLG);
loadPlanCollector.startLocalityGroup();
writer.startDefaultLocalityGroup();
startedDefaultLG = true;
startedLG = true;
Expand Down Expand Up @@ -204,6 +212,7 @@ public void append(Key key, Value val) throws IOException {
validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE);
}
writer.append(key, val);
loadPlanCollector.append(key);
}

/**
Expand Down Expand Up @@ -250,5 +259,23 @@ public void append(Iterable<Entry<Key,Value>> keyValues) throws IOException {
@Override
public void close() throws IOException {
writer.close();
loadPlanCollector.close();
}

/**
* If no split resolver was provided when the RFileWriter was built then this method will return a
* simple load plan of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#FILE} using
* the first and last row seen. If a splitResolver was provided then this will return a load plan
* of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#TABLE} that has the split
* ranges the rows written overlapped.
*
* @param filename
* @return load plan computed from the keys written to the rfile.
* @see org.apache.accumulo.core.client.rfile.RFile.WriterOptions#withSplitResolver(LoadPlan.SplitResolver)
* TODO since tags on all new apis
*/
// TODO test case of empty rfile... and just test this in general
public LoadPlan getLoadPlan(String filename) {
return loadPlanCollector.getLoadPlan(filename);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.crypto.CryptoFactoryLoader;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.metadata.ValidationUtil;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
Expand Down Expand Up @@ -72,6 +73,7 @@ OutputStream getOutputStream() {
private int visCacheSize = 1000;
private Map<String,String> samplerProps = Collections.emptyMap();
private Map<String,String> summarizerProps = Collections.emptyMap();
private LoadPlan.SplitResolver splitReolver;

private void checkDisjoint(Map<String,String> props, Map<String,String> derivedProps,
String kind) {
Expand Down Expand Up @@ -106,6 +108,13 @@ public RFileWriter build() throws IOException {
CryptoService cs =
CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, tableConfig);

LoadPlanCollector loadPlanCollector;
if (splitReolver != null) {
loadPlanCollector = new LoadPlanCollector(splitReolver);
} else {
loadPlanCollector = new LoadPlanCollector();
}

if (out.getOutputStream() != null) {
FSDataOutputStream fsdo;
if (out.getOutputStream() instanceof FSDataOutputStream) {
Expand All @@ -116,11 +125,13 @@ public RFileWriter build() throws IOException {
return new RFileWriter(
fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf(), cs)
.withTableConfiguration(acuconf).withStartDisabled().build(),
visCacheSize);
visCacheSize, loadPlanCollector);
} else {
return new RFileWriter(fileops.newWriterBuilder()
.forFile(out.path.toString(), out.getFileSystem(), out.getConf(), cs)
.withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize);
return new RFileWriter(
fileops.newWriterBuilder()
.forFile(out.path.toString(), out.getFileSystem(), out.getConf(), cs)
.withTableConfiguration(acuconf).withStartDisabled().build(),
visCacheSize, loadPlanCollector);
}
}

Expand Down Expand Up @@ -172,6 +183,12 @@ public WriterOptions withVisibilityCacheSize(int maxSize) {
return this;
}

@Override
public WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver) {
this.splitReolver = splitResolver;
return this;
}

@Override
public WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf) {
Objects.requireNonNull(summarizerConf);
Expand Down
32 changes: 29 additions & 3 deletions core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedSet;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -298,14 +299,39 @@ public static LoadPlan fromJson(String json) {
* Represents two split points that exist in a table being bulk imported to.
*/
public static class TableSplits {
public final Text prevRow;
public final Text endRow;
private final Text prevRow;
private final Text endRow;

public TableSplits(Text prevRow, Text endRow) {
// TODO check order, expect that prevRow < endRow
Preconditions.checkArgument(
prevRow == null || endRow == null || prevRow.compareTo(endRow) < 0, "%s >= %s", prevRow,
endRow);
this.prevRow = prevRow;
this.endRow = endRow;
}

public Text getPrevRow() {
return prevRow;
}

public Text getEndRow() {
return endRow;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TableSplits that = (TableSplits) o;
return Objects.equals(prevRow, that.prevRow) && Objects.equals(endRow, that.endRow);
}

@Override
public int hashCode() {
return Objects.hash(prevRow, endRow);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,6 @@ static String formatString(String prefix, int i) {
public void test1() throws IOException {

// test an empty file

TestRFile trf = new TestRFile(conf);

trf.openWriter();
Expand Down
2 changes: 1 addition & 1 deletion hadoop-mapreduce/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-project</artifactId>
<version>2.1.4-SNAPSHOT</version>
<version>2.1.4-4898</version>
</parent>
<artifactId>accumulo-hadoop-mapreduce</artifactId>
<name>Apache Accumulo Hadoop MapReduce</name>
Expand Down
2 changes: 1 addition & 1 deletion iterator-test-harness/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-project</artifactId>
<version>2.1.4-SNAPSHOT</version>
<version>2.1.4-4898</version>
</parent>
<artifactId>accumulo-iterator-test-harness</artifactId>
<name>Apache Accumulo Iterator Test Harness</name>
Expand Down
2 changes: 1 addition & 1 deletion minicluster/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-project</artifactId>
<version>2.1.4-SNAPSHOT</version>
<version>2.1.4-4898</version>
</parent>
<artifactId>accumulo-minicluster</artifactId>
<name>Apache Accumulo MiniCluster</name>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
</parent>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-project</artifactId>
<version>2.1.4-SNAPSHOT</version>
<version>2.1.4-4898</version>
<packaging>pom</packaging>
<name>Apache Accumulo Project</name>
<description>Apache Accumulo is a sorted, distributed key/value store based
Expand Down
Loading

0 comments on commit e228b68

Please sign in to comment.