Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-turner committed Sep 17, 2024
1 parent b93939d commit a461425
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Stream;

import org.apache.accumulo.core.Constants;
Expand Down Expand Up @@ -324,7 +325,7 @@ public interface KeyExtentCache {
KeyExtent lookup(Text row);
}

public static List<KeyExtent> findOverlappingTablets(KeyExtentCache extentCache,
public static List<KeyExtent> findOverlappingTablets(Function<Text,KeyExtent> rowToExtentResolver,
FileSKVIterator reader) throws IOException {

List<KeyExtent> result = new ArrayList<>();
Expand All @@ -336,7 +337,7 @@ public static List<KeyExtent> findOverlappingTablets(KeyExtentCache extentCache,
break;
}
row = reader.getTopKey().getRow();
KeyExtent extent = extentCache.lookup(row);
KeyExtent extent = rowToExtentResolver.apply(row);
result.add(extent);
row = extent.endRow();
if (row != null) {
Expand All @@ -356,13 +357,13 @@ private static Text nextRow(Text row) {
}

public static List<KeyExtent> findOverlappingTablets(ClientContext context,
KeyExtentCache extentCache, Path file, FileSystem fs, Cache<String,Long> fileLenCache,
CryptoService cs) throws IOException {
Function<Text,KeyExtent> rowToExtentResolver, Path file, FileSystem fs,
Cache<String,Long> fileLenCache, CryptoService cs) throws IOException {
try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
.forFile(file.toString(), fs, fs.getConf(), cs)
.withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache)
.seekToBeginning().build()) {
return findOverlappingTablets(extentCache, reader);
return findOverlappingTablets(rowToExtentResolver, reader);
}
}

Expand Down Expand Up @@ -557,7 +558,7 @@ public SortedMap<KeyExtent,Bulk.Files> computeFileToTabletMappings(FileSystem fs
try {
long t1 = System.currentTimeMillis();
List<KeyExtent> extents =
findOverlappingTablets(context, extentCache, filePath, fs, fileLensCache, cs);
findOverlappingTablets(context, extentCache::lookup, filePath, fs, fileLensCache, cs);
// make sure file isn't going to too many tablets
checkTabletCount(maxTablets, extents.size(), filePath.toString());
Map<KeyExtent,Long> estSizes = estimateSizes(context.getConfiguration(), filePath,
Expand Down
101 changes: 101 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,35 @@

import static java.nio.charset.StandardCharsets.UTF_8;

import java.io.IOException;
import java.net.URI;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.SortedSet;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions;
import org.apache.accumulo.core.clientImpl.bulk.BulkImport;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.UnsignedBytes;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

Expand Down Expand Up @@ -228,4 +246,87 @@ public LoadPlan build() {
}
};
}

private static final TableId FAKE_ID = TableId.of("999");

private static class JsonDestination {
String fileName;
String startRow;
String endRow;
RangeType rangeType;

JsonDestination() {}

JsonDestination(Destination destination) {
fileName = destination.getFileName();
startRow = destination.getStartRow() == null ? null
: Base64.getUrlEncoder().encodeToString(destination.getStartRow());
endRow = destination.getEndRow() == null ? null
: Base64.getUrlEncoder().encodeToString(destination.getEndRow());
rangeType = destination.getRangeType();
}

Destination toDestination() {
return new Destination(fileName, rangeType,
startRow == null ? null : Base64.getUrlDecoder().decode(startRow),
endRow == null ? null : Base64.getUrlDecoder().decode(endRow));
}
}

private static final class JsonAll {
List<JsonDestination> destinations;

JsonAll() {}

JsonAll(List<Destination> destinations) {
this.destinations =
destinations.stream().map(JsonDestination::new).collect(Collectors.toList());
}

}

private static final Gson gson = new GsonBuilder().disableJdkUnsafe().create();

// TODO javadoc
public String toJson() {
return gson.toJson(new JsonAll(destinations));
}

// TODO javadoc
public static LoadPlan fromJson(String json) {
var dests = gson.fromJson(json, JsonAll.class).destinations.stream()
.map(JsonDestination::toDestination).collect(Collectors.toUnmodifiableList());
return new LoadPlan(dests);
}

// TODO javadoc
public static LoadPlan compute(URI file, SortedSet<Text> splits) throws IOException {

// TODO
CryptoService cs = NoCryptoServiceFactory.NONE;
Configuration conf = new Configuration();
Path path = new Path(file);
FileSystem fs = path.getFileSystem(conf);

try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
.forFile(file.toString(), fs, fs.getConf(), cs)
.withTableConfiguration(DefaultConfiguration.getInstance()).seekToBeginning().build()) {

Function<Text,KeyExtent> rowToExtentResolver = row -> {
var headSet = splits.headSet(row);
Text prevRow = headSet.isEmpty() ? null : headSet.last();
var tailSet = splits.tailSet(row);
Text endRow = tailSet.isEmpty() ? null : tailSet.first();
return new KeyExtent(FAKE_ID, endRow, prevRow);
};

List<KeyExtent> overlapping = BulkImport.findOverlappingTablets(rowToExtentResolver, reader);

var builder = builder();
for (var extent : overlapping) {
builder.loadFileTo(path.getName(), RangeType.TABLE, extent.prevEndRow(), extent.endRow());
}
return builder.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.math.BigInteger;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -88,10 +91,12 @@
import org.apache.accumulo.server.constraints.MetadataConstraints;
import org.apache.accumulo.server.constraints.SystemEnvironment;
import org.apache.accumulo.test.util.Wait;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text;
Expand Down Expand Up @@ -476,6 +481,61 @@ public void testBadLoadPlans() throws Exception {
}
}

@Test
public void testComputeLoadPlan() throws Exception {

try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
addSplits(c, tableName, "0333 0666 0999 1333 1666");

String dir = getDir("/testBulkFile-");

Map<String,Set<String>> hashes = new HashMap<>();
String h1 = writeData(dir + "/f1.", aconf, 0, 333);
hashes.put("0333", new HashSet<>(List.of(h1)));
String h2 = writeData(dir + "/f2.", aconf, 0, 666);
hashes.get("0333").add(h2);
hashes.put("0666", new HashSet<>(List.of(h2)));
String h3 = writeData(dir + "/f3.", aconf, 334, 700);
hashes.get("0666").add(h3);
hashes.put("0999", new HashSet<>(List.of(h3)));

SortedSet<Text> splits = new TreeSet<>(c.tableOperations().listSplits(tableName));

List<String> loadPlans = new ArrayList<>();

for(String filename : List.of("f1.rf", "f2.rf", "f3.rf")) {
// This simulates what each reducer would do
Path path = new Path(dir+"/"+filename);

// compute the load plan for the rfile
String lpJson = LoadPlan.compute(path.toUri(), splits).toJson();

// save the load plan to a file
Path lpPath = new Path(path.getParent(), path.getName().replace(".rf",".lp"));
try(var output = getCluster().getFileSystem().create(lpPath, false)) {
IOUtils.write(lpJson, output, UTF_8);
}
}


// This simulates the code that would run after the map reduce job and bulk import the files
var builder = LoadPlan.builder();
for(var status : getCluster().getFileSystem().listStatus(new Path(dir), p->p.getName().endsWith(".lp"))){
try(var input = getCluster().getFileSystem().open(status.getPath())){
String lpJson =IOUtils.toString(input, UTF_8);
builder.addPlan(LoadPlan.fromJson(lpJson));
}
};
LoadPlan lpAll = builder.build();

c.tableOperations().importDirectory(dir).to(tableName).plan(lpAll).load();

verifyData(c, tableName, 0, 700, false);
verifyMetadata(c, tableName, hashes);
}

}

@Test
public void testEmptyDir() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
Expand Down

0 comments on commit a461425

Please sign in to comment.