Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue=1258, t-cache support block-level cache evict #1266

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions src/io/tablet_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ DECLARE_bool(tera_leveldb_ignore_corruption_in_compaction);
DECLARE_bool(tera_leveldb_use_file_lock);

DECLARE_int32(tera_tabletnode_scan_pack_max_size);
DECLARE_bool(tera_tabletnode_cache_enabled);
DECLARE_int32(tera_leveldb_env_local_seek_latency);
DECLARE_int32(tera_leveldb_env_dfs_seek_latency);
DECLARE_int32(tera_memenv_table_cache_size);
DECLARE_bool(tera_use_flash_for_memenv);
DECLARE_bool(tera_tabletnode_block_cache_enabled);

DECLARE_bool(tera_tablet_use_memtable_on_leveldb);
DECLARE_int64(tera_tablet_memtable_ldb_write_buffer_size);
Expand Down Expand Up @@ -1676,18 +1676,25 @@ void TabletIO::SetupOptionsForLG() {
lg_info->env = LeveldbMockEnv();
} else if (store == MemoryStore) {
if (FLAGS_tera_use_flash_for_memenv) {
lg_info->env = LeveldbFlashEnv();
if (FLAGS_tera_tabletnode_block_cache_enabled) {
LOG(INFO) << "MemLG[" << lg_i << "] activate TCache";
lg_info->env = io::DefaultBlockCacheEnv();
} else {
lg_info->env = LeveldbFlashEnv();
}
} else {
lg_info->env = LeveldbMemEnv();
}
lg_info->seek_latency = 0;
lg_info->block_cache = m_memory_cache;
} else if (store == FlashStore) {
if (!FLAGS_tera_tabletnode_cache_enabled) {
lg_info->env = LeveldbFlashEnv();
if (FLAGS_tera_tabletnode_block_cache_enabled) {
//LOG(INFO) << "activate block-level Cache store";
//lg_info->env = leveldb::EnvThreeLevelCache();
LOG(INFO) << "FlashLG[" << lg_i << "] activate TCache";
lg_info->env = io::DefaultBlockCacheEnv();
} else {
LOG(INFO) << "activate block-level Cache store";
lg_info->env = leveldb::EnvThreeLevelCache();
lg_info->env = LeveldbFlashEnv();
}
lg_info->seek_latency = FLAGS_tera_leveldb_env_local_seek_latency;
} else {
Expand Down
18 changes: 18 additions & 0 deletions src/io/utils_leveldb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "common/file/file_path.h"
#include "common/mutex.h"
#include "io/timekey_comparator.h"
#include "leveldb/block_cache.h"
#include "leveldb/comparator.h"
#include "leveldb/env_dfs.h"
#include "leveldb/env_flash.h"
Expand All @@ -31,6 +32,7 @@ DECLARE_string(tera_leveldb_env_hdfs2_nameservice_list);
DECLARE_string(tera_tabletnode_path_prefix);
DECLARE_string(tera_dfs_so_path);
DECLARE_string(tera_dfs_conf);
DECLARE_int32(tera_leveldb_block_cache_env_thread_num);

namespace tera {
namespace io {
Expand Down Expand Up @@ -66,6 +68,21 @@ leveldb::Env* LeveldbBaseEnv() {
}
}

// Tcache: default env
static pthread_once_t block_cache_once = PTHREAD_ONCE_INIT;
static leveldb::Env* default_block_cache_env;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥不像mem及flash一样,弄成函数内的静态变量

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是静态的

static void InitDefaultBlockCacheEnv() {
default_block_cache_env = new leveldb::BlockCacheEnv(LeveldbBaseEnv());
default_block_cache_env->SetBackgroundThreads(FLAGS_tera_leveldb_block_cache_env_thread_num);
LOG(INFO) << "init block cache, thread num " << FLAGS_tera_leveldb_block_cache_env_thread_num;
}

leveldb::Env* DefaultBlockCacheEnv() {
pthread_once(&block_cache_once, InitDefaultBlockCacheEnv);
return default_block_cache_env;
}

// mem env
leveldb::Env* LeveldbMemEnv() {
static Mutex mutex;
static leveldb::Env* mem_env = NULL;
Expand All @@ -78,6 +95,7 @@ leveldb::Env* LeveldbMemEnv() {
return mem_env;
}

// flash env
leveldb::Env* LeveldbFlashEnv() {
static Mutex mutex;
static leveldb::Env* flash_env = NULL;
Expand Down
2 changes: 2 additions & 0 deletions src/io/utils_leveldb.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ void InitDfsEnv();
// return the base env leveldb used (dfs/local), singleton
leveldb::Env* LeveldbBaseEnv();

leveldb::Env* DefaultBlockCacheEnv(); // ssd + base

// return the mem env leveldb used, singleton
leveldb::Env* LeveldbMemEnv();

Expand Down
2 changes: 1 addition & 1 deletion src/leveldb/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ include ../../depends.mk
include build_config.mk

CFLAGS += -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
CXXFLAGS += -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT)
CXXFLAGS += -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -std=gnu++11

LDFLAGS += $(PLATFORM_LDFLAGS) -L$(SNAPPY_LIBDIR) -lrt -ldl -lsnappy
LIBS += $(PLATFORM_LIBS)
Expand Down
3 changes: 0 additions & 3 deletions src/leveldb/db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,6 @@ Status BuildTable(const std::string& dbname,
delete builder;

// Finish and check for file errors
if (s.ok()) {
s = file->Sync();
}
if (s.ok()) {
s = file->Close();
}
Expand Down
3 changes: 0 additions & 3 deletions src/leveldb/db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1179,9 +1179,6 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
compact->builder = NULL;

// Finish and check for file errors
if (s.ok()) {
s = compact->outfile->Sync();
}
if (s.ok()) {
s = compact->outfile->Close();
}
Expand Down
2 changes: 1 addition & 1 deletion src/leveldb/db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Status TableCache::FindTable(const std::string& dbname, const Options* options,

if (!s.ok()) {
assert(table == NULL);
fprintf(stderr, "open sstable file failed: [%s]\n", fname.c_str());
fprintf(stderr, "open sstable file failed: [%s] %s\n", fname.c_str(), s.ToString().c_str());
delete file;
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
Expand Down
106 changes: 106 additions & 0 deletions src/leveldb/include/leveldb/block_cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//
// Author: [email protected]

#ifndef STOREAGE_LEVELDB_UTIL_BLOCK_CACHE_H_
#define STOREAGE_LEVELDB_UTIL_BLOCK_CACHE_H_

#include "leveldb/env.h"
#include "leveldb/options.h"
#include "leveldb/status.h"

namespace leveldb {
/////////////////////////////////////////////
// Tcache
/////////////////////////////////////////////
extern uint64_t kBlockSize;
extern uint64_t kDataSetSize;
extern uint64_t kFidBatchNum;
extern uint64_t kCacheSize;
extern uint64_t kMetaBlockSize;
extern uint64_t kMetaTableSize;
extern uint64_t kWriteBufferSize;

struct BlockCacheOptions {
Options opts;
std::string cache_dir;
uint64_t block_size;
uint64_t dataset_size;
uint64_t fid_batch_num;
uint64_t cache_size;
uint64_t dataset_num;
uint64_t meta_block_cache_size;
uint64_t meta_table_cache_size;
uint64_t write_buffer_size;
Env* env;
Env* cache_env;

BlockCacheOptions()
: block_size(kBlockSize),
dataset_size(kDataSetSize),
fid_batch_num(kFidBatchNum),
cache_size(kCacheSize),
meta_block_cache_size(kMetaBlockSize),
meta_table_cache_size(kMetaTableSize),
write_buffer_size(kWriteBufferSize),
env(NULL) {
dataset_num = cache_size / dataset_size + 1;
}
};

class BlockCacheImpl;

class BlockCacheEnv : public EnvWrapper {
public:
BlockCacheEnv(Env* base);

~BlockCacheEnv();

virtual Status FileExists(const std::string& fname);

virtual Status GetChildren(const std::string& path,
std::vector<std::string>* result);

virtual Status DeleteFile(const std::string& fname);

virtual Status CreateDir(const std::string& name);

virtual Status DeleteDir(const std::string& name);

virtual Status CopyFile(const std::string& from,
const std::string& to);

virtual Status GetFileSize(const std::string& fname, uint64_t* size);

virtual Status RenameFile(const std::string& src, const std::string& target);

virtual Status LockFile(const std::string& fname, FileLock** lock);

virtual Status UnlockFile(FileLock* lock);

virtual Status NewSequentialFile(const std::string& fname,
SequentialFile** result); // never cache log

// cache relatively
virtual Status NewRandomAccessFile(const std::string& fname,
RandomAccessFile** result); // cache Pread
virtual Status NewRandomAccessFile(const std::string& fname,
uint64_t fsize,
RandomAccessFile** result); // cache Pread

virtual Status NewWritableFile(const std::string& fname,
WritableFile** result); // cache Append
virtual Status LoadCache(const BlockCacheOptions& opts, const std::string& cache_dir);

private:
std::vector<BlockCacheImpl*> cache_vec_;
Env* dfs_env_;
};

Env* NewBlockCacheEnv(Env* base);

} // leveldb
#endif
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

STOREAGE_LEVELDB_UTIL_BLOCK_CACHE_H_

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已改


27 changes: 27 additions & 0 deletions src/leveldb/include/leveldb/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,36 @@ namespace leveldb {

class Cache;

// An entry is a variable length heap-allocated structure. Entries
// are kept in a circular doubly linked list ordered by access time.
struct LRUHandle {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

挪出来的目的是?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

能填充cache_id

void* value;
void (*deleter)(const Slice&, void* value);
LRUHandle* next_hash;
LRUHandle* next;
LRUHandle* prev;
size_t charge; // TODO(opt): Only allow uint32_t?
size_t key_length;
uint32_t refs;
uint32_t hash; // Hash of key(); used for fast sharding and comparisons
uint64_t cache_id; // cache id, user spec
char key_data[1]; // Beginning of key

Slice key() const {
// For cheaper lookups, we allow a temporary Handle object
// to store a pointer to a key in "value".
if (next == this) {
return *(reinterpret_cast<Slice*>(value));
} else {
return Slice(key_data, key_length);
}
}
};

// Create a new cache with a fixed size capacity. This implementation
// of Cache uses a least-recently-used eviction policy.
extern Cache* NewLRUCache(size_t capacity);
extern Cache* New2QCache(size_t capacity);

class Cache {
public:
Expand Down
6 changes: 6 additions & 0 deletions src/leveldb/include/leveldb/slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ class Slice {
size_ -= n;
}

// Drop the last "n" bytes from this slice.
void remove_suffix(size_t n) {
assert(n <= size());
size_ -= n;
}

// Return a string that contains the copy of the referenced data.
std::string ToString() const { return std::string(data_, size_); }

Expand Down
Loading