Skip to content

Commit

Permalink
prototype of bulk import v2 distributed file examination
Browse files Browse the repository at this point in the history
This is prototype for a few new APIs that allow distributing the
examination of files for bulk import.

For a given bulk import directory with N files this would support a use
case like the following.

 1. For eack file a task is spun up on a remote server that calls the new
    LoadPlan.compute() API to determine what tablets the file overlaps.
    Then the new LoadPlan.toJson() method is called to serialize the
    load plan and send it to a central place.
 2. All the load plans from the remote servers are deserialized calling
    the new LoadPlan.fromJson() method and merged into a single load
    plan that is used to do the bulk import.

Another use case these new APIs could support is running this new code
in the map reduce job that generates bulk import data.

  1. In each reducer after it produces an rfile it could then call the
     new LoadPlan.compute(), then call LoadPlan.toJson() and save the
     result to a file.  So after the map reduce job completes each rfile
     would have  corresponding file with a load plan for that file.
  2. Another process that runs after the map reduce job can load all the
     load plans from files and merge them using the new
     LoadPlan.fromJson() method.  Then the merged LoadPlan can be used
     to do the bulk import.

BulkNewIT.testComputeLoadPlan() simulates this map reduce use case by
going through the steps in code that a map reduce job would.  This tests
the new APIs and shows what using it would look like.

Both of these use cases avoid doing the analysis of files on a single
machine doing the bulk import.  Bulk import V1 had this functionality
and would ask random tservers to do the file analysis.  This could cause
unexpected load on those tservers.  Bulk V1 would interleave analyzing
files and adding them to tablets.  This could lead to odd situations
where files are partially imported to some tablets and analysis fails,
leaving the file partially imported.  Bulk v2 does all analysis before
any files are added to tablets, however it lacks this distributed
analysis capability.  This is an initial attempt to offer that
functionality in bulk v2.
  • Loading branch information
keith-turner committed Sep 17, 2024
1 parent b93939d commit aa593ad
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Stream;

import org.apache.accumulo.core.Constants;
Expand Down Expand Up @@ -324,7 +325,7 @@ public interface KeyExtentCache {
KeyExtent lookup(Text row);
}

public static List<KeyExtent> findOverlappingTablets(KeyExtentCache extentCache,
public static List<KeyExtent> findOverlappingTablets(Function<Text,KeyExtent> rowToExtentResolver,
FileSKVIterator reader) throws IOException {

List<KeyExtent> result = new ArrayList<>();
Expand All @@ -336,7 +337,7 @@ public static List<KeyExtent> findOverlappingTablets(KeyExtentCache extentCache,
break;
}
row = reader.getTopKey().getRow();
KeyExtent extent = extentCache.lookup(row);
KeyExtent extent = rowToExtentResolver.apply(row);
result.add(extent);
row = extent.endRow();
if (row != null) {
Expand All @@ -356,13 +357,13 @@ private static Text nextRow(Text row) {
}

public static List<KeyExtent> findOverlappingTablets(ClientContext context,
KeyExtentCache extentCache, Path file, FileSystem fs, Cache<String,Long> fileLenCache,
CryptoService cs) throws IOException {
Function<Text,KeyExtent> rowToExtentResolver, Path file, FileSystem fs,
Cache<String,Long> fileLenCache, CryptoService cs) throws IOException {
try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
.forFile(file.toString(), fs, fs.getConf(), cs)
.withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache)
.seekToBeginning().build()) {
return findOverlappingTablets(extentCache, reader);
return findOverlappingTablets(rowToExtentResolver, reader);
}
}

Expand Down Expand Up @@ -557,7 +558,7 @@ public SortedMap<KeyExtent,Bulk.Files> computeFileToTabletMappings(FileSystem fs
try {
long t1 = System.currentTimeMillis();
List<KeyExtent> extents =
findOverlappingTablets(context, extentCache, filePath, fs, fileLensCache, cs);
findOverlappingTablets(context, extentCache::lookup, filePath, fs, fileLensCache, cs);
// make sure file isn't going to too many tablets
checkTabletCount(maxTablets, extents.size(), filePath.toString());
Map<KeyExtent,Long> estSizes = estimateSizes(context.getConfiguration(), filePath,
Expand Down
102 changes: 102 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,35 @@

import static java.nio.charset.StandardCharsets.UTF_8;

import java.io.IOException;
import java.net.URI;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.SortedSet;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions;
import org.apache.accumulo.core.clientImpl.bulk.BulkImport;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.UnsignedBytes;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

Expand Down Expand Up @@ -228,4 +246,88 @@ public LoadPlan build() {
}
};
}

private static final TableId FAKE_ID = TableId.of("999");

private static class JsonDestination {
String fileName;
String startRow;
String endRow;
RangeType rangeType;

JsonDestination() {}

JsonDestination(Destination destination) {
fileName = destination.getFileName();
startRow = destination.getStartRow() == null ? null
: Base64.getUrlEncoder().encodeToString(destination.getStartRow());
endRow = destination.getEndRow() == null ? null
: Base64.getUrlEncoder().encodeToString(destination.getEndRow());
rangeType = destination.getRangeType();
}

Destination toDestination() {
return new Destination(fileName, rangeType,
startRow == null ? null : Base64.getUrlDecoder().decode(startRow),
endRow == null ? null : Base64.getUrlDecoder().decode(endRow));
}
}

private static final class JsonAll {
List<JsonDestination> destinations;

JsonAll() {}

JsonAll(List<Destination> destinations) {
this.destinations =
destinations.stream().map(JsonDestination::new).collect(Collectors.toList());
}

}

private static final Gson gson = new GsonBuilder().disableJdkUnsafe().create();

// TODO javadoc
public String toJson() {
return gson.toJson(new JsonAll(destinations));
}

// TODO javadoc
public static LoadPlan fromJson(String json) {
var dests = gson.fromJson(json, JsonAll.class).destinations.stream()
.map(JsonDestination::toDestination).collect(Collectors.toUnmodifiableList());
return new LoadPlan(dests);
}

// TODO javadoc
public static LoadPlan compute(URI file, SortedSet<Text> splits) throws IOException {

// TODO if the files needed a crypto service how could it be instantiated? Was trying to make
// this method independent of an ClientContext or ServerContext object.
CryptoService cs = NoCryptoServiceFactory.NONE;
Configuration conf = new Configuration();
Path path = new Path(file);
FileSystem fs = path.getFileSystem(conf);

try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
.forFile(file.toString(), fs, fs.getConf(), cs)
.withTableConfiguration(DefaultConfiguration.getInstance()).seekToBeginning().build()) {

Function<Text,KeyExtent> rowToExtentResolver = row -> {
var headSet = splits.headSet(row);
Text prevRow = headSet.isEmpty() ? null : headSet.last();
var tailSet = splits.tailSet(row);
Text endRow = tailSet.isEmpty() ? null : tailSet.first();
return new KeyExtent(FAKE_ID, endRow, prevRow);
};

List<KeyExtent> overlapping = BulkImport.findOverlappingTablets(rowToExtentResolver, reader);

var builder = builder();
for (var extent : overlapping) {
builder.loadFileTo(path.getName(), RangeType.TABLE, extent.prevEndRow(), extent.endRow());
}
return builder.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -88,6 +89,7 @@
import org.apache.accumulo.server.constraints.MetadataConstraints;
import org.apache.accumulo.server.constraints.SystemEnvironment;
import org.apache.accumulo.test.util.Wait;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -476,6 +478,64 @@ public void testBadLoadPlans() throws Exception {
}
}

@Test
public void testComputeLoadPlan() throws Exception {

try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
addSplits(c, tableName, "0333 0666 0999 1333 1666");

String dir = getDir("/testBulkFile-");

Map<String,Set<String>> hashes = new HashMap<>();
String h1 = writeData(dir + "/f1.", aconf, 0, 333);
hashes.put("0333", new HashSet<>(List.of(h1)));
String h2 = writeData(dir + "/f2.", aconf, 0, 666);
hashes.get("0333").add(h2);
hashes.put("0666", new HashSet<>(List.of(h2)));
String h3 = writeData(dir + "/f3.", aconf, 334, 700);
hashes.get("0666").add(h3);
hashes.put("0999", new HashSet<>(List.of(h3)));
hashes.put("1333", Set.of());
hashes.put("1666", Set.of());
hashes.put("null", Set.of());

SortedSet<Text> splits = new TreeSet<>(c.tableOperations().listSplits(tableName));

List<String> loadPlans = new ArrayList<>();

for (String filename : List.of("f1.rf", "f2.rf", "f3.rf")) {
// The body of this loop simulates what each reducer would do
Path path = new Path(dir + "/" + filename);

// compute the load plan for the rfile
String lpJson = LoadPlan.compute(path.toUri(), splits).toJson();

// save the load plan to a file
Path lpPath = new Path(path.getParent(), path.getName().replace(".rf", ".lp"));
try (var output = getCluster().getFileSystem().create(lpPath, false)) {
IOUtils.write(lpJson, output, UTF_8);
}
}

// This simulates the code that would run after the map reduce job and bulk import the files
var builder = LoadPlan.builder();
for (var status : getCluster().getFileSystem().listStatus(new Path(dir),
p -> p.getName().endsWith(".lp"))) {
try (var input = getCluster().getFileSystem().open(status.getPath())) {
String lpJson = IOUtils.toString(input, UTF_8);
builder.addPlan(LoadPlan.fromJson(lpJson));
}
}

LoadPlan lpAll = builder.build();

c.tableOperations().importDirectory(dir).to(tableName).plan(lpAll).load();

verifyData(c, tableName, 0, 700, false);
verifyMetadata(c, tableName, hashes);
}
}

@Test
public void testEmptyDir() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
Expand Down Expand Up @@ -621,7 +681,7 @@ private void verifyMetadata(AccumuloClient client, String tableName,

String endRow = tablet.getEndRow() == null ? "null" : tablet.getEndRow().toString();

assertEquals(expectedHashes.get(endRow), fileHashes);
assertEquals(expectedHashes.get(endRow), fileHashes, "endRow "+endRow);

endRowsSeen.add(endRow);
}
Expand Down

0 comments on commit aa593ad

Please sign in to comment.