Skip to content

Commit

Permalink
issue=1258, Tcache support block-level cache evict
Browse files Browse the repository at this point in the history
  • Loading branch information
caijieming committed Aug 3, 2017
1 parent 73c4d3a commit 27af669
Showing 1 changed file with 33 additions and 9 deletions.
42 changes: 33 additions & 9 deletions src/leveldb/util/block_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace leveldb {
/////////////////////////////////////////////
// Tcache
/////////////////////////////////////////////
uint64_t kBlockSize = 8192UL;
uint64_t kBlockSize = 4096UL;
uint64_t kDataSetSize = 134217728UL;
uint64_t kFidBatchNum = 200000UL;
uint64_t kCacheSize = 350000000000UL;
Expand Down Expand Up @@ -644,6 +644,10 @@ class BlockCacheRandomAccessFile : public RandomAccessFile {
reader->block = block;
reader->offset = offset;
reader->n = n;
Log("[%s] pread in miss list, %s, offset %lu, size %lu\n",
cache_->WorkPath().c_str(),
block->ToString().c_str(),
offset, n);
cache_->bg_read_.Schedule(&BlockCacheRandomAccessFile::AsyncDfsRead, reader, 10);
}

Expand All @@ -653,6 +657,9 @@ class BlockCacheRandomAccessFile : public RandomAccessFile {
AsyncCacheReader* reader = new AsyncCacheReader;
reader->file = const_cast<BlockCacheRandomAccessFile*>(this);
reader->block = block;
Log("[%s] pread in valid list, %s\n",
cache_->WorkPath().c_str(),
block->ToString().c_str());
cache_->bg_read_.Schedule(&BlockCacheRandomAccessFile::AsyncCacheRead, reader, 10);
}

Expand All @@ -661,25 +668,39 @@ class BlockCacheRandomAccessFile : public RandomAccessFile {
MutexLock lockgard(&cache_->mu_);
CacheBlock* block = c_miss[i];
block->cv.Wait();
Log("[%s] pread in miss list(dfs done), %s\n",
cache_->WorkPath().c_str(),
block->ToString().c_str());
}

for (uint32_t i = 0; i < c_miss.size(); ++i) {
CacheBlock* block = c_miss[i];
AsyncCacheWriter* writer = new AsyncCacheWriter;
writer->file = const_cast<BlockCacheRandomAccessFile*>(this);
writer->block = block;
Log("[%s] pread in miss list(fill cache), %s\n",
cache_->WorkPath().c_str(),
block->ToString().c_str());
cache_->bg_fill_.Schedule(&BlockCacheRandomAccessFile::AsyncCacheWrite, writer, 10);
}

for (uint32_t i = 0; i < c_miss.size(); ++i) { // wait cache fill finish
MutexLock lockgard(&cache_->mu_);
CacheBlock* block = c_miss[i];
block->cv.Wait();
Log("[%s] pread in miss list(fill cache done), %s\n",
cache_->WorkPath().c_str(),
block->ToString().c_str());
}

// wait cache read done
for (uint32_t i = 0; i < c_valid.size(); ++i) {
MutexLock lockgard(&cache_->mu_);
CacheBlock* block = c_valid[i];
block->cv.Wait();
Log("[%s] pread in valid list(done), %s\n",
cache_->WorkPath().c_str(),
block->ToString().c_str());
}

// wait other async read finish
Expand All @@ -689,8 +710,6 @@ class BlockCacheRandomAccessFile : public RandomAccessFile {
while (block->state != kCacheBlockValid) {
block->cv.Wait();
}
assert(block->data_block_alloc); // MUST alloc by block cache itself
assert(block->data_block.data() != NULL);
}

// fill user mem
Expand Down Expand Up @@ -720,16 +739,19 @@ class BlockCacheRandomAccessFile : public RandomAccessFile {
for (uint32_t i = 0; i < c_miss.size(); ++i) {
CacheBlock* block = c_miss[i];
block->state = kCacheBlockValid;
Log("[%s] wakeup for miss, %s\n", cache_->WorkPath().c_str(), block->ToString().c_str());
cache_->ReleaseBlock(block);
}
for (uint32_t i = 0; i < c_valid.size(); ++i) {
CacheBlock* block = c_valid[i];
block->state = kCacheBlockValid;
Log("[%s] wakeup for valid, %s\n", cache_->WorkPath().c_str(), block->ToString().c_str());
cache_->ReleaseBlock(block);
}
for (uint32_t i = 0; i < c_locked.size(); ++i) {
CacheBlock* block = c_locked[i];
block->state = kCacheBlockValid;
Log("[%s] wakeup for lock, %s\n", cache_->WorkPath().c_str(), block->ToString().c_str());
cache_->ReleaseBlock(block);
}

Expand Down Expand Up @@ -759,11 +781,9 @@ class BlockCacheRandomAccessFile : public RandomAccessFile {
char* scratch = (char*)(block->data_block.data());
Slice result;
uint64_t offset = block->block_idx * cache_->options_.block_size;
size_t n = (offset + cache_->options_.block_size) > (reader->offset + reader->n) ?
(reader->offset + reader->n) % cache_->options_.block_size
: cache_->options_.block_size;
size_t n = cache_->options_.block_size;
s = dfs_file_->Read(offset, n, &result, scratch);
Log("[%s] cache async.dfs read, %s, ori.offset %lu, origin.size %lu"
Log("[%s] cache async.dfs read, %s, origin.offset %lu, origin.size %lu"
", offset %lu, size %lu, status %s, res %lu\n",
cache_->WorkPath().c_str(), block->ToString().c_str(),
reader->offset, reader->n,
Expand Down Expand Up @@ -806,8 +826,11 @@ class BlockCacheRandomAccessFile : public RandomAccessFile {
}
void HandleCacheWrite(AsyncCacheWriter* writer) {
CacheBlock* block = writer->block;
block->s = cache_->LogRecord(block);
block->s = cache_->FillCache(block);
Log("[%s] handle cache write, %s\n",
cache_->WorkPath().c_str(),
block->ToString().c_str());
cache_->LogRecord(block);
cache_->FillCache(block);

MutexLock lockgard(&cache_->mu_);
block->cv.SignalAll();
Expand Down Expand Up @@ -1220,6 +1243,7 @@ Status BlockCacheImpl::ReleaseBlock(CacheBlock* block) {
DataSet* ds = GetDataSet(block->sid); // get and alloc ds
block->ReleaseDataBlock();
block->handle = NULL;
block->cv.SignalAll();
ds->cache->Release((Cache::Handle*)h);
mu_.Unlock();

Expand Down

0 comments on commit 27af669

Please sign in to comment.