Skip to content

Commit

Permalink
Merge branch 'main' into NewAsyncBindings
Browse files Browse the repository at this point in the history
  • Loading branch information
gregschohn committed Apr 30, 2024
2 parents 748cccd + 60cd859 commit 4994898
Show file tree
Hide file tree
Showing 38 changed files with 548 additions and 207 deletions.
15 changes: 10 additions & 5 deletions RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ repositories {
mavenCentral()
}

ext {
awsSdkVersion = '2.25.16'
dataset = findProperty('dataset') ?: 'skip_dataset'
}

dependencies {
implementation 'com.beust:jcommander:1.81'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.16.2'
Expand All @@ -41,7 +46,11 @@ dependencies {
implementation platform('io.projectreactor:reactor-bom:2023.0.5')
implementation 'io.projectreactor.netty:reactor-netty-core'
implementation 'io.projectreactor.netty:reactor-netty-http'
implementation 'software.amazon.awssdk:s3:2.25.16'

implementation platform("software.amazon.awssdk:bom:$awsSdkVersion")
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:s3-transfer-manager'
implementation 'software.amazon.awssdk.crt:aws-crt:0.29.18'

testImplementation 'io.projectreactor:reactor-test:3.6.5'
testImplementation 'org.apache.logging.log4j:log4j-core:2.23.1'
Expand All @@ -61,10 +70,6 @@ clean.doFirst {
delete project.file("./docker/build")
}

ext {
dataset = findProperty('dataset') ?: 'skip_dataset'
}

task demoPrintOutSnapshot (type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
mainClass = 'com.rfs.DemoPrintOutSnapshot'
Expand Down
2 changes: 1 addition & 1 deletion RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public static void main(String[] args) throws InterruptedException {
if (snapshotDirPath != null) {
repo = new FilesystemRepo(snapshotDirPath);
} else if (s3RepoUri != null && s3Region != null && s3LocalDirPath != null) {
repo = S3Repo.create(s3LocalDirPath, s3RepoUri, s3Region);
repo = S3Repo.create(s3LocalDirPath, new S3Uri(s3RepoUri), s3Region);
} else if (snapshotLocalRepoDirPath != null) {
repo = new FilesystemRepo(snapshotLocalRepoDirPath);
} else {
Expand Down
13 changes: 13 additions & 0 deletions RFS/src/main/java/com/rfs/common/FilesystemRepo.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,43 +38,56 @@ public FilesystemRepo(Path repoRootDir) {
this.repoRootDir = repoRootDir;
}

@Override
public Path getRepoRootDir() {
return repoRootDir;
}

@Override
public Path getSnapshotRepoDataFilePath() throws IOException {
return findRepoFile();
}

@Override
public Path getGlobalMetadataFilePath(String snapshotId) throws IOException {
String filePath = getRepoRootDir().toString() + "/meta-" + snapshotId + ".dat";
return Path.of(filePath);
}

@Override
public Path getSnapshotMetadataFilePath(String snapshotId) throws IOException {
String filePath = getRepoRootDir().toString() + "/snap-" + snapshotId + ".dat";
return Path.of(filePath);
}

@Override
public Path getIndexMetadataFilePath(String indexId, String indexFileId) throws IOException {
String filePath = getRepoRootDir().toString() + "/indices/" + indexId + "/meta-" + indexFileId + ".dat";
return Path.of(filePath);
}

@Override
public Path getShardDirPath(String indexId, int shardId) throws IOException {
String shardDirPath = getRepoRootDir().toString() + "/indices/" + indexId + "/" + shardId;
return Path.of(shardDirPath);
}

@Override
public Path getShardMetadataFilePath(String snapshotId, String indexId, int shardId) throws IOException {
Path shardDirPath = getShardDirPath(indexId, shardId);
Path filePath = shardDirPath.resolve("snap-" + snapshotId + ".dat");
return filePath;
}

@Override
public Path getBlobFilePath(String indexId, int shardId, String blobName) throws IOException {
Path shardDirPath = getShardDirPath(indexId, shardId);
Path filePath = shardDirPath.resolve(blobName);
return filePath;
}

@Override
public void prepBlobFiles(ShardMetadata.Data shardMetadata) throws IOException {
// No work necessary for local filesystem
}
}
132 changes: 86 additions & 46 deletions RFS/src/main/java/com/rfs/common/S3Repo.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,37 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryDownload;
import software.amazon.awssdk.transfer.s3.model.DirectoryDownload;
import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest;

import java.util.Comparator;
import java.util.Optional;

public class S3Repo implements SourceRepo {
private static final Logger logger = LogManager.getLogger(S3Repo.class);
private static final double S3_TARGET_THROUGHPUT_GIBPS = 8.0; // Arbitrarily chosen
private static final long S3_MAX_MEMORY_BYTES = 1024L * 1024 * 1024; // Arbitrarily chosen
private static final long S3_MINIMUM_PART_SIZE_BYTES = 8L * 1024 * 1024; // Default, but be explicit

private final Path s3LocalDir;
private final String s3RepoUri;
private final S3Uri s3RepoUri;
private final String s3Region;
private final S3Client s3Client;
private final S3AsyncClient s3Client;

private static int extractVersion(String key) {
try {
Expand All @@ -35,130 +43,162 @@ private static int extractVersion(String key) {
}
}

protected String findRepoFileUri() {
String bucketName = getS3BucketName();
String prefix = getS3ObjectsPrefix();

protected S3Uri findRepoFileUri() {
ListObjectsV2Request listRequest = ListObjectsV2Request.builder()
.bucket(bucketName)
.prefix(prefix)
.bucket(s3RepoUri.bucketName)
.prefix(s3RepoUri.key)
.build();

ListObjectsV2Response listResponse = s3Client.listObjectsV2(listRequest);
ListObjectsV2Response listResponse = s3Client.listObjectsV2(listRequest).join();

Optional<S3Object> highestVersionedIndexFile = listResponse.contents().stream()
.filter(s3Object -> s3Object.key().matches(".*index-\\d+$")) // Regex to match index files
.max(Comparator.comparingInt(s3Object -> extractVersion(s3Object.key())));

return highestVersionedIndexFile
.map(s3Object -> "s3://" + bucketName + "/" + s3Object.key())
.orElse(null);
String rawUri = highestVersionedIndexFile
.map(s3Object -> "s3://" + s3RepoUri.bucketName + "/" + s3Object.key())
.orElse("");
return new S3Uri(rawUri);
}

protected void ensureS3LocalDirectoryExists(Path localPath) throws IOException {
Files.createDirectories(localPath);
}

private void downloadFile(String s3Uri, Path localPath) throws IOException {
logger.info("Downloading file from S3: " + s3Uri + " to " + localPath);
protected boolean doesFileExistLocally(Path localPath) {
return Files.exists(localPath);
}

private void ensureFileExistsLocally(S3Uri s3Uri, Path localPath) throws IOException {
ensureS3LocalDirectoryExists(localPath.getParent());

String bucketName = s3Uri.split("/")[2];
String key = s3Uri.split(bucketName + "/")[1];
if (doesFileExistLocally(localPath)) {
logger.debug("File already exists locally: " + localPath);
return;
}

logger.info("Downloading file from S3: " + s3Uri.uri + " to " + localPath);
GetObjectRequest getObjectRequest = GetObjectRequest.builder()
.bucket(bucketName)
.key(key)
.bucket(s3Uri.bucketName)
.key(s3Uri.key)
.build();

s3Client.getObject(getObjectRequest, ResponseTransformer.toFile(localPath));
s3Client.getObject(getObjectRequest, AsyncResponseTransformer.toFile(localPath)).join();
}

public static S3Repo create(Path s3LocalDir, String s3Uri, String s3Region) {
S3Client s3Client = S3Client.builder()
.region(Region.of(s3Region))
.credentialsProvider(DefaultCredentialsProvider.create())
.build();
public static S3Repo create(Path s3LocalDir, S3Uri s3Uri, String s3Region) {
S3AsyncClient s3Client = S3AsyncClient.crtBuilder()
.credentialsProvider(DefaultCredentialsProvider.create())
.region(Region.of(s3Region))
.retryConfiguration(r -> r.numRetries(3))
.targetThroughputInGbps(S3_TARGET_THROUGHPUT_GIBPS)
.maxNativeMemoryLimitInBytes(S3_MAX_MEMORY_BYTES)
.minimumPartSizeInBytes(S3_MINIMUM_PART_SIZE_BYTES)
.build();

return new S3Repo(s3LocalDir, s3Uri, s3Region, s3Client);
}

public S3Repo(Path s3LocalDir, String s3Uri, String s3Region, S3Client s3Client) {
public S3Repo(Path s3LocalDir, S3Uri s3Uri, String s3Region, S3AsyncClient s3Client) {
this.s3LocalDir = s3LocalDir;
this.s3RepoUri = s3Uri;
this.s3Region = s3Region;
this.s3Client = s3Client;
}

@Override
public Path getRepoRootDir() {
return s3LocalDir;
}

@Override
public Path getSnapshotRepoDataFilePath() throws IOException {
String repoFileS3Uri = findRepoFileUri();
S3Uri repoFileS3Uri = findRepoFileUri();

String relativeFileS3Uri = repoFileS3Uri.substring(s3RepoUri.length() + 1);
String relativeFileS3Uri = repoFileS3Uri.uri.substring(s3RepoUri.uri.length() + 1);

Path localFilePath = s3LocalDir.resolve(relativeFileS3Uri);
downloadFile(repoFileS3Uri, localFilePath);
ensureFileExistsLocally(repoFileS3Uri, localFilePath);

return localFilePath;
}

@Override
public Path getGlobalMetadataFilePath(String snapshotId) throws IOException {
String suffix = "meta-" + snapshotId + ".dat";
Path filePath = s3LocalDir.resolve(suffix);
downloadFile(s3RepoUri + "/" + suffix, filePath);
S3Uri fileUri = new S3Uri(s3RepoUri.uri + "/" + suffix);
ensureFileExistsLocally(fileUri, filePath);
return filePath;
}

@Override
public Path getSnapshotMetadataFilePath(String snapshotId) throws IOException {
String suffix = "snap-" + snapshotId + ".dat";
Path filePath = s3LocalDir.resolve(suffix);
downloadFile(s3RepoUri + "/" + suffix, filePath);
S3Uri fileUri = new S3Uri(s3RepoUri.uri + "/" + suffix);
ensureFileExistsLocally(fileUri, filePath);
return filePath;
}

@Override
public Path getIndexMetadataFilePath(String indexId, String indexFileId) throws IOException {
String suffix = "indices/" + indexId + "/meta-" + indexFileId + ".dat";
Path filePath = s3LocalDir.resolve(suffix);
downloadFile(s3RepoUri + "/" + suffix, filePath);
S3Uri fileUri = new S3Uri(s3RepoUri.uri + "/" + suffix);
ensureFileExistsLocally(fileUri, filePath);
return filePath;
}

@Override
public Path getShardDirPath(String indexId, int shardId) throws IOException {
String suffix = "indices/" + indexId + "/" + shardId;
Path shardDirPath = s3LocalDir.resolve(suffix);
return shardDirPath;
}

@Override
public Path getShardMetadataFilePath(String snapshotId, String indexId, int shardId) throws IOException {
String suffix = "indices/" + indexId + "/" + shardId + "/snap-" + snapshotId + ".dat";
Path filePath = s3LocalDir.resolve(suffix);
downloadFile(s3RepoUri + "/" + suffix, filePath);
S3Uri fileUri = new S3Uri(s3RepoUri.uri + "/" + suffix);
ensureFileExistsLocally(fileUri, filePath);
return filePath;
}

@Override
public Path getBlobFilePath(String indexId, int shardId, String blobName) throws IOException {
String suffix = "indices/" + indexId + "/" + shardId + "/" + blobName;
Path filePath = s3LocalDir.resolve(suffix);
downloadFile(s3RepoUri + "/" + suffix, filePath);
S3Uri fileUri = new S3Uri(s3RepoUri.uri + "/" + suffix);
ensureFileExistsLocally(fileUri, filePath);
return filePath;
}

public String getS3BucketName() {
String[] parts = s3RepoUri.substring(5).split("/", 2);
return parts[0];
}
@Override
public void prepBlobFiles(ShardMetadata.Data shardMetadata) throws IOException {
S3TransferManager transferManager = S3TransferManager.builder().s3Client(s3Client).build();

Path shardDirPath = getShardDirPath(shardMetadata.getIndexId(), shardMetadata.getShardId());
ensureS3LocalDirectoryExists(shardDirPath);

public String getS3ObjectsPrefix() {
String[] parts = s3RepoUri.substring(5).split("/", 2);
String prefix = parts.length > 1 ? parts[1] : "";
String blobFilesS3Prefix = s3RepoUri.key + "indices/" + shardMetadata.getIndexId() + "/" + shardMetadata.getShardId() + "/";

if (!prefix.isEmpty() && prefix.endsWith("/")) {
prefix = prefix.substring(0, prefix.length() - 1);
}
logger.info("Downloading blob files from S3: s3://" + s3RepoUri.bucketName + "/" + blobFilesS3Prefix + " to " + shardDirPath);
DirectoryDownload directoryDownload = transferManager.downloadDirectory(
DownloadDirectoryRequest.builder()
.destination(shardDirPath)
.bucket(s3RepoUri.bucketName)
.listObjectsV2RequestTransformer(l -> l.prefix(blobFilesS3Prefix))
.build()
);

// Wait for the transfer to complete
CompletedDirectoryDownload completedDirectoryDownload = directoryDownload.completionFuture().join();

logger.info("Blob file download(s) complete");

return prefix;
// Print out any failed downloads
completedDirectoryDownload.failedTransfers().forEach(logger::error);
}
}
20 changes: 20 additions & 0 deletions RFS/src/main/java/com/rfs/common/S3Uri.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.rfs.common;

public class S3Uri {
public final String bucketName;
public final String key;
public final String uri;

public S3Uri(String rawUri) {
String[] parts = rawUri.substring(5).split("/", 2);
bucketName = parts[0];

String keyRaw = parts.length > 1 ? parts[1] : "";
if (!keyRaw.isEmpty() && keyRaw.endsWith("/")) {
keyRaw = keyRaw.substring(0, keyRaw.length() - 1);
}
key = keyRaw;

uri = rawUri.endsWith("/") ? rawUri.substring(0, rawUri.length() - 1) : rawUri;
}
}
3 changes: 3 additions & 0 deletions RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ public static void unpack(SourceRepo repo, ShardMetadata.Data shardMetadata, Pat
// Some constants
NativeFSLockFactory lockFactory = NativeFSLockFactory.INSTANCE;

// Ensure the blob files are prepped, if they need to be
repo.prepBlobFiles(shardMetadata);

// Create the directory for the shard's lucene files
Path luceneIndexDir = Paths.get(luceneFilesBasePath + "/" + shardMetadata.getIndexName() + "/" + shardMetadata.getShardId());
Files.createDirectories(luceneIndexDir);
Expand Down
6 changes: 6 additions & 0 deletions RFS/src/main/java/com/rfs/common/SourceRepo.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,10 @@ public interface SourceRepo {
public Path getShardDirPath(String indexId, int shardId) throws IOException;
public Path getShardMetadataFilePath(String snapshotId, String indexId, int shardId) throws IOException;
public Path getBlobFilePath(String indexId, int shardId, String blobName) throws IOException;

/*
* Performs any work necessary to facilitate access to a given shard's blob files. Depending on the implementation,
* may involve no work at all, bulk downloading objects from a remote source, or any other operations.
*/
public void prepBlobFiles(ShardMetadata.Data shardMetadata) throws IOException;
}
Loading

0 comments on commit 4994898

Please sign in to comment.