forked from slackhq/astra
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introduce dual layer (disk / heap) S3 LRU cache for cache nodes (slac…
…khq#1123) Co-authored-by: Bryan Burkholder <[email protected]>
- Loading branch information
Showing
9 changed files
with
738 additions
and
119 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
185 changes: 185 additions & 0 deletions
185
astra/src/main/java/com/slack/astra/blobfs/DiskCachePagingLoader.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
package com.slack.astra.blobfs; | ||
|
||
import static com.slack.astra.blobfs.S3CachePagingLoader.DISK_PAGE_SIZE; | ||
import static com.slack.astra.util.SizeConstant.GB; | ||
|
||
import com.github.benmanes.caffeine.cache.CacheLoader; | ||
import com.github.benmanes.caffeine.cache.Caffeine; | ||
import com.github.benmanes.caffeine.cache.LoadingCache; | ||
import com.github.benmanes.caffeine.cache.RemovalListener; | ||
import com.github.benmanes.caffeine.cache.Scheduler; | ||
import com.github.benmanes.caffeine.cache.Weigher; | ||
import java.io.IOException; | ||
import java.nio.ByteBuffer; | ||
import java.nio.channels.FileChannel; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.nio.file.StandardOpenOption; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.concurrent.ExecutionException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import software.amazon.awssdk.core.FileTransformerConfiguration; | ||
import software.amazon.awssdk.core.async.AsyncResponseTransformer; | ||
import software.amazon.awssdk.services.s3.S3AsyncClient; | ||
import software.amazon.awssdk.services.s3.model.GetObjectRequest; | ||
|
||
/** Caches S3 data onto the disk, helping to reduce the penalty of S3 read latency. */ | ||
public class DiskCachePagingLoader { | ||
|
||
// Total size of the disk cache - may temporarily exceed this so some buffer should be available | ||
// on the host | ||
public static final String ASTRA_S3_STREAMING_DISK_CACHE_SIZE = "astra.s3Streaming.diskCacheSize"; | ||
protected static final long DISK_CACHE_SIZE = | ||
Long.parseLong( | ||
System.getProperty(ASTRA_S3_STREAMING_DISK_CACHE_SIZE, String.valueOf(200 * GB))); | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(DiskCachePagingLoader.class); | ||
|
||
private final LoadingCache<LoadingCacheKey, Path> diskCache = | ||
Caffeine.newBuilder() | ||
.maximumWeight(DISK_CACHE_SIZE) | ||
.scheduler(Scheduler.systemScheduler()) | ||
.evictionListener(evictionListener()) | ||
.weigher(weigher()) | ||
.build(this.bytesCacheLoader()); | ||
|
||
private final BlobStore blobStore; | ||
private final S3AsyncClient s3AsyncClient; | ||
private final long pageSize; | ||
|
||
public DiskCachePagingLoader(BlobStore blobStore, S3AsyncClient s3AsyncClient, long pageSize) { | ||
this.blobStore = blobStore; | ||
this.s3AsyncClient = s3AsyncClient; | ||
this.pageSize = pageSize; | ||
} | ||
|
||
private static Weigher<LoadingCacheKey, Path> weigher() { | ||
return (_, value) -> { | ||
try { | ||
// todo - consider reworking weights to use kb instead of bytes? This will fail if files | ||
// exceed 2GB (int overflow) | ||
long fileSize = Files.size(value); | ||
LOG.debug("Calculated size of path {} is {} bytes", value, fileSize); | ||
return Math.toIntExact(fileSize); | ||
} catch (IOException e) { | ||
LOG.error("Error calculating size", e); | ||
// if we can't calculate just use the max page size | ||
return Math.toIntExact(DISK_PAGE_SIZE); | ||
} | ||
}; | ||
} | ||
|
||
private static RemovalListener<LoadingCacheKey, Path> evictionListener() { | ||
return (cacheKey, path, removalCause) -> { | ||
if (cacheKey != null) { | ||
LOG.debug( | ||
"Evicting from disk cache - chunkID: {} / filename: {}, fromOffset: {}, toOffset: {} / cause: {}", | ||
cacheKey.getChunkId(), | ||
cacheKey.getFilename(), | ||
cacheKey.getFromOffset(), | ||
cacheKey.getToOffset(), | ||
removalCause); | ||
} | ||
if (path != null) { | ||
try { | ||
Files.deleteIfExists(path); | ||
} catch (IOException e) { | ||
LOG.error("Failed to delete file {}, {}, {}", cacheKey, path, removalCause, e); | ||
} | ||
} else { | ||
LOG.error("Path was unexpectedly null, {}, {}, {}", cacheKey, path, removalCause); | ||
} | ||
}; | ||
} | ||
|
||
private CacheLoader<LoadingCacheKey, Path> bytesCacheLoader() { | ||
return key -> { | ||
LOG.debug( | ||
"Using S3 to load disk cache - chunkID: {} / filename: {}, fromOffset: {}, toOffset: {}", | ||
key.getChunkId(), | ||
key.getFilename(), | ||
key.getFromOffset(), | ||
key.getToOffset()); | ||
|
||
// todo - consider making this configurable to a different directory (or using the data dir | ||
// value) | ||
Path filePath = | ||
Path.of( | ||
System.getProperty("java.io.tmpdir"), | ||
String.format( | ||
"astra-cache-%s-%s-%s-%s.tmp", | ||
key.getChunkId(), key.getFilename(), key.getFromOffset(), key.getToOffset())); | ||
s3AsyncClient | ||
.getObject( | ||
GetObjectRequest.builder() | ||
.bucket(blobStore.bucketName) | ||
.key(key.getPath()) | ||
.range(String.format("bytes=%s-%s", key.getFromOffset(), key.getToOffset())) | ||
.build(), | ||
AsyncResponseTransformer.toFile( | ||
filePath, | ||
FileTransformerConfiguration.builder() | ||
.failureBehavior(FileTransformerConfiguration.FailureBehavior.DELETE) | ||
.fileWriteOption( | ||
FileTransformerConfiguration.FileWriteOption.CREATE_OR_REPLACE_EXISTING) | ||
.build())) | ||
.get(); | ||
return filePath; | ||
}; | ||
} | ||
|
||
public void readBytes( | ||
String chunkId, | ||
String filename, | ||
byte[] b, | ||
int startingOffset, | ||
long originalPointer, | ||
int totalLength) | ||
throws ExecutionException, IOException { | ||
// pointer here is the "global" file pointer | ||
int currentOffset = startingOffset; | ||
long currentPointer = originalPointer; | ||
int remainingLengthToRead = totalLength; | ||
|
||
for (LoadingCacheKey cacheKey : getCacheKeys(chunkId, filename, originalPointer, totalLength)) { | ||
// the relative pointer for the file on disk | ||
long relativePointer = currentPointer % pageSize; | ||
|
||
try (FileChannel fileChannel = | ||
FileChannel.open(diskCache.get(cacheKey), StandardOpenOption.READ)) { | ||
fileChannel.position(relativePointer); | ||
|
||
// if we need to read in everything | ||
if (currentPointer + remainingLengthToRead > cacheKey.getToOffset()) { | ||
// read from the relative pointer to the end | ||
int lengthToRead = Math.toIntExact(pageSize - relativePointer); | ||
ByteBuffer byteBuffer = ByteBuffer.wrap(b, currentOffset, lengthToRead); | ||
fileChannel.read(byteBuffer); | ||
|
||
currentOffset += lengthToRead; | ||
currentPointer += lengthToRead; | ||
remainingLengthToRead -= lengthToRead; | ||
} else { | ||
ByteBuffer byteBuffer = ByteBuffer.wrap(b, currentOffset, remainingLengthToRead); | ||
fileChannel.read(byteBuffer); | ||
break; | ||
} | ||
} | ||
} | ||
} | ||
|
||
public List<LoadingCacheKey> getCacheKeys( | ||
String chunkId, String filename, long originalPointer, int len) { | ||
long startingPage = Math.floorDiv(originalPointer, pageSize); | ||
long endingPage = Math.ceilDiv(originalPointer + len, pageSize); | ||
|
||
List<LoadingCacheKey> cacheKeys = new ArrayList<>(Math.toIntExact(endingPage - startingPage)); | ||
for (long i = startingPage; i < endingPage; i++) { | ||
cacheKeys.add( | ||
new LoadingCacheKey(chunkId, filename, i * pageSize, i * pageSize + pageSize - 1)); | ||
} | ||
return cacheKeys; | ||
} | ||
} |
155 changes: 155 additions & 0 deletions
155
astra/src/main/java/com/slack/astra/blobfs/HeapCachePagingLoader.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
package com.slack.astra.blobfs; | ||
|
||
import static com.slack.astra.util.SizeConstant.GB; | ||
|
||
import com.github.benmanes.caffeine.cache.CacheLoader; | ||
import com.github.benmanes.caffeine.cache.Caffeine; | ||
import com.github.benmanes.caffeine.cache.LoadingCache; | ||
import com.github.benmanes.caffeine.cache.RemovalListener; | ||
import com.github.benmanes.caffeine.cache.Scheduler; | ||
import com.github.benmanes.caffeine.cache.Weigher; | ||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.concurrent.ExecutionException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import software.amazon.awssdk.services.s3.S3AsyncClient; | ||
import software.amazon.awssdk.services.s3.model.HeadObjectRequest; | ||
|
||
/** Caches S3 data into the heap, passing through to the disk cache if the data is not available. */ | ||
public class HeapCachePagingLoader { | ||
private static final Logger LOG = LoggerFactory.getLogger(HeapCachePagingLoader.class); | ||
|
||
public static final String ASTRA_S3_STREAMING_HEAP_CACHE_SIZE = "astra.s3Streaming.heapCacheSize"; | ||
protected static final long HEAP_CACHE_SIZE = | ||
Long.parseLong( | ||
System.getProperty(ASTRA_S3_STREAMING_HEAP_CACHE_SIZE, String.valueOf(1 * GB))); | ||
|
||
private final LoadingCache<LoadingCacheKey, byte[]> heapCache = | ||
Caffeine.newBuilder() | ||
.maximumWeight(HEAP_CACHE_SIZE) | ||
.scheduler(Scheduler.systemScheduler()) | ||
.removalListener(heapRemovalListener()) | ||
.weigher(weigher()) | ||
.build(this.bytesCacheLoader()); | ||
|
||
private final LoadingCache<LoadingCacheKey, Long> fileLengthCache = | ||
Caffeine.newBuilder().maximumSize(25000).build(this.fileLengthLoader()); | ||
|
||
private final BlobStore blobStore; | ||
private final S3AsyncClient s3AsyncClient; | ||
private final DiskCachePagingLoader diskCachePagingLoader; | ||
private final long pageSize; | ||
|
||
public HeapCachePagingLoader( | ||
BlobStore blobStore, | ||
S3AsyncClient s3AsyncClient, | ||
DiskCachePagingLoader diskCachePagingLoader, | ||
long pageSize) { | ||
this.blobStore = blobStore; | ||
this.s3AsyncClient = s3AsyncClient; | ||
this.diskCachePagingLoader = diskCachePagingLoader; | ||
this.pageSize = pageSize; | ||
} | ||
|
||
private static Weigher<LoadingCacheKey, byte[]> weigher() { | ||
return (_, value) -> value.length; | ||
} | ||
|
||
private static RemovalListener<LoadingCacheKey, byte[]> heapRemovalListener() { | ||
return (key, _, cause) -> { | ||
if (key != null) { | ||
LOG.debug( | ||
"Evicting from heap cache - chunkID: {} / filename: {}, fromOffset: {}, toOffset: {} / cause: {}", | ||
key.getChunkId(), | ||
key.getFilename(), | ||
key.getFromOffset(), | ||
key.getToOffset(), | ||
cause); | ||
} | ||
}; | ||
} | ||
|
||
private CacheLoader<LoadingCacheKey, byte[]> bytesCacheLoader() { | ||
return key -> { | ||
LOG.debug( | ||
"Using disk cache to load heap cache - chunkID: {} / filename: {}, fromOffset: {}, toOffset: {}", | ||
key.getChunkId(), | ||
key.getFilename(), | ||
key.getFromOffset(), | ||
key.getToOffset()); | ||
int length = Math.toIntExact(key.getToOffset() - key.getFromOffset() + 1); | ||
byte[] bytes = new byte[length]; | ||
diskCachePagingLoader.readBytes( | ||
key.getChunkId(), key.getFilename(), bytes, 0, key.getFromOffset(), length); | ||
return bytes; | ||
}; | ||
} | ||
|
||
private CacheLoader<LoadingCacheKey, Long> fileLengthLoader() { | ||
return key -> | ||
s3AsyncClient | ||
.headObject( | ||
HeadObjectRequest.builder().bucket(blobStore.bucketName).key(key.getPath()).build()) | ||
.get() | ||
.contentLength(); | ||
} | ||
|
||
public void readBytes( | ||
String chunkId, | ||
String filename, | ||
byte[] b, | ||
int startingOffset, | ||
long originalPointer, | ||
int totalLength) | ||
throws ExecutionException, IOException { | ||
int currentOffset = startingOffset; | ||
long currentPointer = originalPointer; | ||
int remainingLengthToRead = totalLength; | ||
|
||
for (LoadingCacheKey cacheKey : getCacheKeys(chunkId, filename, originalPointer, totalLength)) { | ||
long relativePointer = currentPointer % pageSize; | ||
// if we need to read in everything | ||
if (currentPointer + remainingLengthToRead > cacheKey.getToOffset()) { | ||
// read from the relative pointer to the end | ||
int lengthToRead = Math.toIntExact(pageSize - relativePointer); | ||
System.arraycopy( | ||
heapCache.get(cacheKey), | ||
Math.toIntExact(relativePointer), | ||
b, | ||
currentOffset, | ||
lengthToRead); | ||
|
||
currentOffset += lengthToRead; | ||
currentPointer += lengthToRead; | ||
remainingLengthToRead -= lengthToRead; | ||
} else { | ||
System.arraycopy( | ||
heapCache.get(cacheKey), | ||
Math.toIntExact(relativePointer), | ||
b, | ||
currentOffset, | ||
remainingLengthToRead); | ||
break; | ||
} | ||
} | ||
} | ||
|
||
public long length(String chunkId, String filename) throws ExecutionException { | ||
return fileLengthCache.get(new LoadingCacheKey(chunkId, filename)); | ||
} | ||
|
||
public List<LoadingCacheKey> getCacheKeys( | ||
String chunkId, String filename, long originalPointer, int len) { | ||
long startingPage = Math.floorDiv(originalPointer, pageSize); | ||
long endingPage = Math.ceilDiv(originalPointer + len, pageSize); | ||
|
||
List<LoadingCacheKey> cacheKeys = new ArrayList<>(Math.toIntExact(endingPage - startingPage)); | ||
for (long i = startingPage; i < endingPage; i++) { | ||
cacheKeys.add( | ||
new LoadingCacheKey(chunkId, filename, i * pageSize, i * pageSize + pageSize - 1)); | ||
} | ||
return cacheKeys; | ||
} | ||
} |
Oops, something went wrong.