diff --git a/src/io/tablet_io.cc b/src/io/tablet_io.cc index 81222e447..6de53b462 100644 --- a/src/io/tablet_io.cc +++ b/src/io/tablet_io.cc @@ -59,11 +59,11 @@ DECLARE_bool(tera_leveldb_ignore_corruption_in_compaction); DECLARE_bool(tera_leveldb_use_file_lock); DECLARE_int32(tera_tabletnode_scan_pack_max_size); -DECLARE_bool(tera_tabletnode_cache_enabled); DECLARE_int32(tera_leveldb_env_local_seek_latency); DECLARE_int32(tera_leveldb_env_dfs_seek_latency); DECLARE_int32(tera_memenv_table_cache_size); DECLARE_bool(tera_use_flash_for_memenv); +DECLARE_bool(tera_tabletnode_block_cache_enabled); DECLARE_bool(tera_tablet_use_memtable_on_leveldb); DECLARE_int64(tera_tablet_memtable_ldb_write_buffer_size); @@ -1676,18 +1676,25 @@ void TabletIO::SetupOptionsForLG() { lg_info->env = LeveldbMockEnv(); } else if (store == MemoryStore) { if (FLAGS_tera_use_flash_for_memenv) { - lg_info->env = LeveldbFlashEnv(); + if (FLAGS_tera_tabletnode_block_cache_enabled) { + LOG(INFO) << "MemLG[" << lg_i << "] activate TCache"; + lg_info->env = io::DefaultBlockCacheEnv(); + } else { + lg_info->env = LeveldbFlashEnv(); + } } else { lg_info->env = LeveldbMemEnv(); } lg_info->seek_latency = 0; lg_info->block_cache = m_memory_cache; } else if (store == FlashStore) { - if (!FLAGS_tera_tabletnode_cache_enabled) { - lg_info->env = LeveldbFlashEnv(); + if (FLAGS_tera_tabletnode_block_cache_enabled) { + //LOG(INFO) << "activate block-level Cache store"; + //lg_info->env = leveldb::EnvThreeLevelCache(); + LOG(INFO) << "FlashLG[" << lg_i << "] activate TCache"; + lg_info->env = io::DefaultBlockCacheEnv(); } else { - LOG(INFO) << "activate block-level Cache store"; - lg_info->env = leveldb::EnvThreeLevelCache(); + lg_info->env = LeveldbFlashEnv(); } lg_info->seek_latency = FLAGS_tera_leveldb_env_local_seek_latency; } else { diff --git a/src/io/utils_leveldb.cc b/src/io/utils_leveldb.cc index 253e23f56..c6d16e2a8 100644 --- a/src/io/utils_leveldb.cc +++ b/src/io/utils_leveldb.cc @@ -15,6 +15,7 @@ #include "common/file/file_path.h" #include "common/mutex.h" #include "io/timekey_comparator.h" +#include "leveldb/block_cache.h" #include "leveldb/comparator.h" #include "leveldb/env_dfs.h" #include "leveldb/env_flash.h" @@ -31,6 +32,7 @@ DECLARE_string(tera_leveldb_env_hdfs2_nameservice_list); DECLARE_string(tera_tabletnode_path_prefix); DECLARE_string(tera_dfs_so_path); DECLARE_string(tera_dfs_conf); +DECLARE_int32(tera_leveldb_block_cache_env_thread_num); namespace tera { namespace io { @@ -66,6 +68,21 @@ leveldb::Env* LeveldbBaseEnv() { } } +// Tcache: default env +static pthread_once_t block_cache_once = PTHREAD_ONCE_INIT; +static leveldb::Env* default_block_cache_env; +static void InitDefaultBlockCacheEnv() { + default_block_cache_env = new leveldb::BlockCacheEnv(LeveldbBaseEnv()); + default_block_cache_env->SetBackgroundThreads(FLAGS_tera_leveldb_block_cache_env_thread_num); + LOG(INFO) << "init block cache, thread num " << FLAGS_tera_leveldb_block_cache_env_thread_num; +} + +leveldb::Env* DefaultBlockCacheEnv() { + pthread_once(&block_cache_once, InitDefaultBlockCacheEnv); + return default_block_cache_env; +} + +// mem env leveldb::Env* LeveldbMemEnv() { static Mutex mutex; static leveldb::Env* mem_env = NULL; @@ -78,6 +95,7 @@ leveldb::Env* LeveldbMemEnv() { return mem_env; } +// flash env leveldb::Env* LeveldbFlashEnv() { static Mutex mutex; static leveldb::Env* flash_env = NULL; diff --git a/src/io/utils_leveldb.h b/src/io/utils_leveldb.h index f77847db9..39e5d73c1 100644 --- a/src/io/utils_leveldb.h +++ b/src/io/utils_leveldb.h @@ -18,6 +18,8 @@ void InitDfsEnv(); // return the base env leveldb used (dfs/local), singleton leveldb::Env* LeveldbBaseEnv(); +leveldb::Env* DefaultBlockCacheEnv(); // ssd + base + // return the mem env leveldb used, singleton leveldb::Env* LeveldbMemEnv(); diff --git a/src/leveldb/Makefile b/src/leveldb/Makefile index c9162d2eb..72e322d16 100644 --- a/src/leveldb/Makefile +++ b/src/leveldb/Makefile @@ -19,7 +19,7 @@ include ../../depends.mk include build_config.mk CFLAGS += -I. -I./include $(PLATFORM_CCFLAGS) $(OPT) -CXXFLAGS += -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) +CXXFLAGS += -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -std=gnu++11 LDFLAGS += $(PLATFORM_LDFLAGS) -L$(SNAPPY_LIBDIR) -lrt -ldl -lsnappy LIBS += $(PLATFORM_LIBS) diff --git a/src/leveldb/db/builder.cc b/src/leveldb/db/builder.cc index fdbae74af..5bce6f796 100644 --- a/src/leveldb/db/builder.cc +++ b/src/leveldb/db/builder.cc @@ -137,9 +137,6 @@ Status BuildTable(const std::string& dbname, delete builder; // Finish and check for file errors - if (s.ok()) { - s = file->Sync(); - } if (s.ok()) { s = file->Close(); } diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc index c076008de..7d72b617b 100644 --- a/src/leveldb/db/db_impl.cc +++ b/src/leveldb/db/db_impl.cc @@ -1179,9 +1179,6 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, compact->builder = NULL; // Finish and check for file errors - if (s.ok()) { - s = compact->outfile->Sync(); - } if (s.ok()) { s = compact->outfile->Close(); } diff --git a/src/leveldb/db/table_cache.cc b/src/leveldb/db/table_cache.cc index e6af0d97b..c9cdb77ea 100644 --- a/src/leveldb/db/table_cache.cc +++ b/src/leveldb/db/table_cache.cc @@ -93,7 +93,7 @@ Status TableCache::FindTable(const std::string& dbname, const Options* options, if (!s.ok()) { assert(table == NULL); - fprintf(stderr, "open sstable file failed: [%s]\n", fname.c_str()); + fprintf(stderr, "open sstable file failed: [%s] %s\n", fname.c_str(), s.ToString().c_str()); delete file; // We do not cache error results so that if the error is transient, // or somebody repairs the file, we recover automatically. diff --git a/src/leveldb/include/leveldb/block_cache.h b/src/leveldb/include/leveldb/block_cache.h new file mode 100644 index 000000000..ebd1b0cf1 --- /dev/null +++ b/src/leveldb/include/leveldb/block_cache.h @@ -0,0 +1,106 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// Author: caijieming@baidu.com + +#ifndef STOREAGE_LEVELDB_UTIL_BLOCK_CACHE_H_ +#define STOREAGE_LEVELDB_UTIL_BLOCK_CACHE_H_ + +#include "leveldb/env.h" +#include "leveldb/options.h" +#include "leveldb/status.h" + +namespace leveldb { +///////////////////////////////////////////// +// Tcache +///////////////////////////////////////////// +extern uint64_t kBlockSize; +extern uint64_t kDataSetSize; +extern uint64_t kFidBatchNum; +extern uint64_t kCacheSize; +extern uint64_t kMetaBlockSize; +extern uint64_t kMetaTableSize; +extern uint64_t kWriteBufferSize; + +struct BlockCacheOptions { + Options opts; + std::string cache_dir; + uint64_t block_size; + uint64_t dataset_size; + uint64_t fid_batch_num; + uint64_t cache_size; + uint64_t dataset_num; + uint64_t meta_block_cache_size; + uint64_t meta_table_cache_size; + uint64_t write_buffer_size; + Env* env; + Env* cache_env; + + BlockCacheOptions() + : block_size(kBlockSize), + dataset_size(kDataSetSize), + fid_batch_num(kFidBatchNum), + cache_size(kCacheSize), + meta_block_cache_size(kMetaBlockSize), + meta_table_cache_size(kMetaTableSize), + write_buffer_size(kWriteBufferSize), + env(NULL) { + dataset_num = cache_size / dataset_size + 1; + } +}; + +class BlockCacheImpl; + +class BlockCacheEnv : public EnvWrapper { +public: + BlockCacheEnv(Env* base); + + ~BlockCacheEnv(); + + virtual Status FileExists(const std::string& fname); + + virtual Status GetChildren(const std::string& path, + std::vector* result); + + virtual Status DeleteFile(const std::string& fname); + + virtual Status CreateDir(const std::string& name); + + virtual Status DeleteDir(const std::string& name); + + virtual Status CopyFile(const std::string& from, + const std::string& to); + + virtual Status GetFileSize(const std::string& fname, uint64_t* size); + + virtual Status RenameFile(const std::string& src, const std::string& target); + + virtual Status LockFile(const std::string& fname, FileLock** lock); + + virtual Status UnlockFile(FileLock* lock); + + virtual Status NewSequentialFile(const std::string& fname, + SequentialFile** result); // never cache log + + // cache relatively + virtual Status NewRandomAccessFile(const std::string& fname, + RandomAccessFile** result); // cache Pread + virtual Status NewRandomAccessFile(const std::string& fname, + uint64_t fsize, + RandomAccessFile** result); // cache Pread + + virtual Status NewWritableFile(const std::string& fname, + WritableFile** result); // cache Append + virtual Status LoadCache(const BlockCacheOptions& opts, const std::string& cache_dir); + +private: + std::vector cache_vec_; + Env* dfs_env_; +}; + +Env* NewBlockCacheEnv(Env* base); + +} // leveldb +#endif + diff --git a/src/leveldb/include/leveldb/cache.h b/src/leveldb/include/leveldb/cache.h index 636811b65..2299b2528 100644 --- a/src/leveldb/include/leveldb/cache.h +++ b/src/leveldb/include/leveldb/cache.h @@ -29,9 +29,36 @@ namespace leveldb { class Cache; +// An entry is a variable length heap-allocated structure. Entries +// are kept in a circular doubly linked list ordered by access time. +struct LRUHandle { + void* value; + void (*deleter)(const Slice&, void* value); + LRUHandle* next_hash; + LRUHandle* next; + LRUHandle* prev; + size_t charge; // TODO(opt): Only allow uint32_t? + size_t key_length; + uint32_t refs; + uint32_t hash; // Hash of key(); used for fast sharding and comparisons + uint64_t cache_id; // cache id, user spec + char key_data[1]; // Beginning of key + + Slice key() const { + // For cheaper lookups, we allow a temporary Handle object + // to store a pointer to a key in "value". + if (next == this) { + return *(reinterpret_cast(value)); + } else { + return Slice(key_data, key_length); + } + } +}; + // Create a new cache with a fixed size capacity. This implementation // of Cache uses a least-recently-used eviction policy. extern Cache* NewLRUCache(size_t capacity); +extern Cache* New2QCache(size_t capacity); class Cache { public: diff --git a/src/leveldb/include/leveldb/slice.h b/src/leveldb/include/leveldb/slice.h index 4f1eea30e..286f303f7 100644 --- a/src/leveldb/include/leveldb/slice.h +++ b/src/leveldb/include/leveldb/slice.h @@ -68,6 +68,12 @@ class Slice { size_ -= n; } + // Drop the last "n" bytes from this slice. + void remove_suffix(size_t n) { + assert(n <= size()); + size_ -= n; + } + // Return a string that contains the copy of the referenced data. std::string ToString() const { return std::string(data_, size_); } diff --git a/src/leveldb/include/leveldb/statistics.h b/src/leveldb/include/leveldb/statistics.h new file mode 100644 index 000000000..235192db2 --- /dev/null +++ b/src/leveldb/include/leveldb/statistics.h @@ -0,0 +1,112 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef STORAGE_LEVELDB_INCLUDE_STATISTICS_H_ +#define STORAGE_LEVELDB_INCLUDE_STATISTICS_H_ + +#include +#include + +#include +#include + +namespace leveldb { + +/** + * Keep adding ticker's here. + * 1. Any ticker should be added before TICKER_ENUM_MAX. + * 2. Add a readable string in TickersNameMap below for the newly added ticker. + */ +enum Tickers : uint32_t { + TICKER_ENUM_MAX +}; + +// The order of items listed in Tickers should be the same as +// the order listed in TickersNameMap +const std::vector > TickersNameMap = { +}; + +/** + * Keep adding histogram's here. + * Any histogram whould have value less than HISTOGRAM_ENUM_MAX + * Add a new Histogram by assigning it the current value of HISTOGRAM_ENUM_MAX + * Add a string representation in HistogramsNameMap below + * And increment HISTOGRAM_ENUM_MAX + */ +enum Histograms : uint32_t { + // tera block cache spec + TERA_BLOCK_CACHE_PREAD_QUEUE = 0, + TERA_BLOCK_CACHE_PREAD_SSD_READ, + TERA_BLOCK_CACHE_PREAD_FILL_USER_DATA, + TERA_BLOCK_CACHE_PREAD_RELEASE_BLOCK, + TERA_BLOCK_CACHE_LOCKMAP_DS_RELOAD_NR, + TERA_BLOCK_CACHE_PREAD_GET_BLOCK, + TERA_BLOCK_CACHE_PREAD_BLOCK_NR, + TERA_BLOCK_CACHE_GET_DATA_SET, + TERA_BLOCK_CACHE_DS_LRU_LOOKUP, + TERA_BLOCK_CACHE_PREAD_WAIT_UNLOCK, + TERA_BLOCK_CACHE_ALLOC_FID, + TERA_BLOCK_CACHE_GET_FID, + TERA_BLOCK_CACHE_EVICT_NR, + TERA_BLOCK_CACHE_PREAD_DFS_READ, + TERA_BLOCK_CACHE_PREAD_SSD_WRITE, + HISTOGRAM_ENUM_MAX, // TODO(ldemailly): enforce HistogramsNameMap match +}; + +const std::vector > HistogramsNameMap = { + {TERA_BLOCK_CACHE_PREAD_QUEUE, "tera.block_cache.pread_queue"}, + {TERA_BLOCK_CACHE_PREAD_SSD_READ, "tera.block_cache.pread_ssd_read"}, + {TERA_BLOCK_CACHE_PREAD_FILL_USER_DATA, "tera.block_cache.pread_fill_user_data"}, + {TERA_BLOCK_CACHE_PREAD_RELEASE_BLOCK, "tera.block_cache.pread_release_block"}, + {TERA_BLOCK_CACHE_LOCKMAP_DS_RELOAD_NR, "tera.block_cache.lockmap_ds_reload_nr"}, + {TERA_BLOCK_CACHE_PREAD_GET_BLOCK, "tera.block_cache.pread_get_block"}, + {TERA_BLOCK_CACHE_PREAD_BLOCK_NR, "tera.block_cache.pread_block_nr"}, + {TERA_BLOCK_CACHE_GET_DATA_SET, "tera.block_cache.get_data_set"}, + {TERA_BLOCK_CACHE_DS_LRU_LOOKUP, "tera.block_cache.ds_lru_lookup"}, + {TERA_BLOCK_CACHE_PREAD_WAIT_UNLOCK, "tera.block_cache.pread_wait_unlock"}, + {TERA_BLOCK_CACHE_ALLOC_FID, "tera.block_cache.alloc_fid"}, + {TERA_BLOCK_CACHE_GET_FID, "tera.block_cache.get_fid"}, + {TERA_BLOCK_CACHE_EVICT_NR, "tera.block_cache.evict_nr"}, + {TERA_BLOCK_CACHE_PREAD_DFS_READ, "tera.block_cache.pread_dfs_read"}, + {TERA_BLOCK_CACHE_PREAD_SSD_WRITE, "tera.block_cache.pread_ssd_write"}, +}; + +struct HistogramData { + double median; // 中值 + double percentile95; + double percentile99; // 99分为点 + double average; + double standard_deviation; +}; + +// Analyze the performance of a db +class Statistics { + public: + virtual ~Statistics() {} + + virtual int64_t GetTickerCount(uint32_t ticker_type) = 0; + virtual void RecordTick(uint32_t ticker_type, uint64_t count = 0) = 0; + virtual void SetTickerCount(uint32_t ticker_type, uint64_t count) = 0; + + virtual void GetHistogramData(uint32_t type, + HistogramData* const data) = 0; + virtual std::string GetBriefHistogramString(uint32_t type) { return ""; } + virtual std::string GetHistogramString(uint32_t type) const { return ""; } + virtual void MeasureTime(uint32_t histogram_type, uint64_t time) = 0; + virtual void ClearHistogram(uint32_t type) = 0; + + // String representation of the statistic object. + virtual std::string ToString() { + // Do nothing by default + return std::string("ToString(): not implemented"); + } + virtual void ClearAll() = 0; +}; + +// Create a concrete DBStatistics object +Statistics* CreateDBStatistics(); + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_INCLUDE_STATISTICS_H_ diff --git a/src/leveldb/table/format.cc b/src/leveldb/table/format.cc index f4e2e5259..c226a152a 100644 --- a/src/leveldb/table/format.cc +++ b/src/leveldb/table/format.cc @@ -97,7 +97,10 @@ Status ReadBlock(RandomAccessFile* file, const uint32_t actual = crc32c::Value(data, n + 1); if (actual != crc) { delete[] buf; - s = Status::Corruption("block checksum mismatch"); + char err[128] = {'\0'}; + sprintf(err, "block checksum mismatch: crc %u, actual %u, offset %lu, size %lu", + crc, actual, handle.offset(), n + kBlockTrailerSize); + s = Status::Corruption(Slice(err, strlen(err))); return s; } } diff --git a/src/leveldb/util/block_cache.cc b/src/leveldb/util/block_cache.cc new file mode 100644 index 000000000..ab5421e6e --- /dev/null +++ b/src/leveldb/util/block_cache.cc @@ -0,0 +1,1676 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// Author: caijieming@baidu.com + +#include "leveldb/block_cache.h" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "../utils/counter.h" + +#include "db/table_cache.h" +#include "leveldb/db.h" +#include "leveldb/cache.h" +#include "leveldb/env.h" +#include "leveldb/iterator.h" +#include "leveldb/options.h" +#include "leveldb/statistics.h" +#include "leveldb/status.h" +#include "leveldb/table_utils.h" +#include "leveldb/write_batch.h" +#include "port/port.h" +#include "util/coding.h" +#include "util/hash.h" +#include "util/mutexlock.h" +#include "util/string_ext.h" +#include "util/thread_pool.h" + +namespace leveldb { + +::tera::Counter tera_block_cache_evict_counter; + +///////////////////////////////////////////// +// t-cache impl +///////////////////////////////////////////// +uint64_t kBlockSize = 4096UL; +uint64_t kDataSetSize = 128UL << 20; +uint64_t kFidBatchNum = 100000UL; +uint64_t kCacheSize = 350000000000UL; +uint64_t kMetaBlockSize = 2000UL; +uint64_t kMetaTableSize = 500UL; +uint64_t kWriteBufferSize = 1048576UL; + +class BlockCacheWritableFile; +class BlockCacheRandomAccessFile; +class BlockCacheImpl; + +// Each SSD will New a BlockCache +// block state +uint64_t kCacheBlockValid = 0x1; +uint64_t kCacheBlockLocked = 0x2; +uint64_t kCacheBlockDfsRead = 0x4; +uint64_t kCacheBlockCacheRead = 0x8; +uint64_t kCacheBlockCacheFill = 0x10; + +struct CacheBlock { + uint64_t fid; + uint64_t block_idx; + uint64_t sid; + uint64_t cache_block_idx; + volatile uint64_t state; + port::Mutex mu; + port::CondVar cv; + Slice data_block; + bool data_block_alloc; + uint64_t data_block_refs; + LRUHandle* handle; + LRUHandle* data_set_handle; + Status s; + + CacheBlock() + : fid(0), + block_idx(0), + sid(0xffffffffffffffff), + cache_block_idx(0xffffffffffffffff), + state(0), + cv(&mu), + data_block_alloc(false), + data_block_refs(0), + handle(NULL), + data_set_handle(NULL) { + } + + bool Test(uint64_t c_state) { + mu.AssertHeld(); + return (state & c_state) == c_state; + } + + void Clear(uint64_t c_state) { + mu.AssertHeld(); + state &= ~c_state; + } + + void Set(uint64_t c_state) { + mu.AssertHeld(); + state |= c_state; + } + + void WaitOnClear(uint64_t c_state) { // access in lock + mu.AssertHeld(); + while (Test(c_state)) { + cv.Wait(); + } + } + + // access in cache lock + void GetDataBlock(uint64_t block_size, Slice data) { + if (data_block_refs == 0) { // first one alloc mem + assert(data_block.size() == 0); + assert(data_block_alloc == false); + if (data.size() == 0) { + char* buf = new char[block_size]; + data = Slice(buf, block_size); + data_block_alloc = true; + } + data_block = data; + } + ++data_block_refs; + } + + // access in cache lock + void ReleaseDataBlock() { + --data_block_refs; + if (data_block_refs == 0) { + if (data_block_alloc) { + char* data = (char*)data_block.data(); + delete[] data; + data_block_alloc = false; + } + data_block = Slice(); + } + } + + void DecodeFrom(Slice record) { + fid = DecodeFixed64(record.data()); + record.remove_prefix(sizeof(uint64_t)); + block_idx = DecodeFixed64(record.data()); + record.remove_prefix(sizeof(uint64_t)); + state = DecodeFixed64(record.data()); + return; + } + + const std::string Encode() { + std::string r; + PutFixed64(&r, fid); + PutFixed64(&r, block_idx); + PutFixed64(&r, state); + return r; + } + + const std::string ToString() { + std::stringstream ss; + ss << "CacheBlock(" << (uint64_t)this << "): fid: " << fid << ", block_idx: " << block_idx + << ", sid: " << sid << ", cache_block_idx: " << cache_block_idx + << ", state " << state << ", status " << s.ToString(); + return ss.str(); + } +}; + +struct DataSet { + LRUHandle* h; + port::Mutex mu; + Cache* cache; + int fd; + + DataSet(): h(NULL), cache(NULL), fd(-1) {} +}; + +class BlockCacheImpl { +public: + BlockCacheImpl(const BlockCacheOptions& options); + + ~BlockCacheImpl(); + + const std::string& WorkPath(); + + Status LoadCache(); // init cache + + Status NewWritableFile(const std::string& fname, + WritableFile** result); + + Status NewRandomAccessFile(const std::string& fname, + uint64_t fsize, + RandomAccessFile** result); // cache Pread + + static void BlockDeleter(const Slice& key, void* v); + + static void BGControlThreadFunc(void* arg); + + Status DeleteFile(const std::string& fname); + +private: + friend struct DataSet; + struct LockContent; + + Status LockAndPut(LockContent& lc); + + Status GetContentAfterWait(LockContent& lc); + + Status PutContentAfterLock(LockContent& lc); + + Status ReloadDataSet(LockContent& lc); + + Status FillCache(CacheBlock* block); + + Status ReadCache(CacheBlock* block, struct aiocb* aio_context); + + uint64_t AllocFileId(); // no more than fid_batch_num + + uint64_t FileId(const std::string& fname); + + DataSet* GetDataSet(uint64_t sid); + + CacheBlock* GetAndAllocBlock(uint64_t fid, uint64_t block_idx); + + Status LogRecord(CacheBlock* block); + + Status ReleaseBlock(CacheBlock* block, bool need_sync); + + void BGControlThread(); + +private: + friend class BlockCacheWritableFile; + friend class BlockCacheRandomAccessFile; + friend struct CacheBlock; + + BlockCacheOptions options_; + std::string work_path_; + Env* dfs_env_; + //Env* posix_env_; + + port::Mutex mu_; + // key lock list + struct Waiter { + int wait_num; // protected by BlockCacheImpl.mu_ + + port::Mutex mu; + port::CondVar cv; + bool done; + Waiter(): wait_num(0), cv(&mu), done(false) {} + + void Wait() { + MutexLock l(&mu); + while (!done) { cv.Wait(); } + } + + void SignalAll() { + MutexLock l(&mu); + done = true; + cv.SignalAll(); + } + }; + typedef std::map LockKeyMap; + LockKeyMap lock_key_; + + uint64_t new_fid_; + uint64_t prev_fid_; + + enum LockKeyType { + kDBKey = 0, + kDataSetKey = 1, + kDeleteDBKey = 2, + }; + struct LockContent { + int type; + + // DB key + Slice db_lock_key; + Slice db_lock_val; + std::string* db_val; + + // data set id + uint64_t sid; + DataSet* data_set; + + const std::string Encode() { + if (type == kDBKey || type == kDeleteDBKey) { + return db_lock_key.ToString(); + } else if (type == kDataSetKey) { + std::string key = "DS#"; + PutFixed64(&key, sid); + return key; + } + return ""; + } + + const std::string KeyToString() { + if (type == kDBKey || type == kDeleteDBKey) { + return db_lock_key.ToString(); + } else if (type == kDataSetKey) { + std::stringstream ss; + ss << "DS#" << sid; + return ss.str(); + } else { + return ""; + } + } + + const std::string ValToString() { + if (type == kDBKey) { + uint64_t val = DecodeFixed64(db_lock_val.data()); + std::stringstream ss; + ss << val; + return ss.str(); + } + return ""; + } + }; + Cache* data_set_cache_; + + Statistics* stat_; + //WritableFile* logfile_; + //log::Writer* log_; + DB* db_; // store meta + ThreadPool bg_fill_; + ThreadPool bg_read_; + ThreadPool bg_dfs_read_; + ThreadPool bg_flush_; + ThreadPool bg_control_; +}; + +// Must insure not init more than twice +Env* NewBlockCacheEnv(Env* base) { + return new BlockCacheEnv(base); +} + +BlockCacheEnv::BlockCacheEnv(Env* base) + : EnvWrapper(NewPosixEnv()), dfs_env_(base) { + //target()->SetBackgroundThreads(30); +} + +BlockCacheEnv::~BlockCacheEnv() {} + +Status BlockCacheEnv::FileExists(const std::string& fname) { + return dfs_env_->FileExists(fname); +} + +Status BlockCacheEnv::GetChildren(const std::string& path, + std::vector* result) { + return dfs_env_->GetChildren(path, result); +} + +Status BlockCacheEnv::DeleteFile(const std::string& fname) { + if (fname.rfind(".sst") == fname.size() - 4) { + uint32_t hash = (Hash(fname.c_str(), fname.size(), 13)) % cache_vec_.size(); + BlockCacheImpl* cache = cache_vec_[hash]; + cache->DeleteFile(fname); + } + return dfs_env_->DeleteFile(fname); +} + +Status BlockCacheEnv::CreateDir(const std::string& name) { + return dfs_env_->CreateDir(name); +} + +Status BlockCacheEnv::DeleteDir(const std::string& name) { + return dfs_env_->DeleteDir(name); +} + +Status BlockCacheEnv::CopyFile(const std::string& from, + const std::string& to) { + return dfs_env_->CopyFile(from, to); +} + +Status BlockCacheEnv::GetFileSize(const std::string& fname, uint64_t* size) { + return dfs_env_->GetFileSize(fname, size); +} + +Status BlockCacheEnv::RenameFile(const std::string& src, const std::string& target) { + return dfs_env_->RenameFile(src, target); +} + +Status BlockCacheEnv::LockFile(const std::string& fname, FileLock** lock) { + return dfs_env_->LockFile(fname, lock); +} + +Status BlockCacheEnv::UnlockFile(FileLock* lock) { + return dfs_env_->UnlockFile(lock); +} + +Status BlockCacheEnv::LoadCache(const BlockCacheOptions& opts, const std::string& cache_dir) { + BlockCacheOptions options = opts; + options.cache_dir = cache_dir; + options.env = dfs_env_; + options.cache_env = this->target(); + BlockCacheImpl* cache = new BlockCacheImpl(options); + Status s = cache->LoadCache(); + cache_vec_.push_back(cache); // no need lock + return s; +} + +Status BlockCacheEnv::NewSequentialFile(const std::string& fname, + SequentialFile** result) { + return dfs_env_->NewSequentialFile(fname, result); +} + +Status BlockCacheEnv::NewWritableFile(const std::string& fname, + WritableFile** result) { + if (fname.rfind(".sst") != fname.size() - 4) { + return dfs_env_->NewWritableFile(fname, result); + } + + // cache sst file + uint32_t hash = (Hash(fname.c_str(), fname.size(), 13)) % cache_vec_.size(); + BlockCacheImpl* cache = cache_vec_[hash]; + Status s = cache->NewWritableFile(fname, result); + if (!s.ok()) { + Log("[block_cache %s] open file write fail: %s, hash: %u, status: %s\n", + cache->WorkPath().c_str(), fname.c_str(), hash, s.ToString().c_str()); + } + return s; +} + +Status BlockCacheEnv::NewRandomAccessFile(const std::string& fname, + RandomAccessFile** result) { + //uint32_t hash = (Hash(fname.c_str(), fname.size(), 13)) % cache_vec_.size(); + //BlockCacheImpl* cache = cache_vec_[hash]; + //Status s = cache->NewRandomAccessFile(fname, result); + //if (!s.ok()) { + // Log("[block_cache %s] open file read fail: %s, hash: %u, status: %s\n", + // cache->WorkPath().c_str(), fname.c_str(), hash, s.ToString().c_str()); + //} + //return s; + abort(); + return Status::OK(); +} + +Status BlockCacheEnv::NewRandomAccessFile(const std::string& fname, + uint64_t fsize, + RandomAccessFile** result) { + uint32_t hash = (Hash(fname.c_str(), fname.size(), 13)) % cache_vec_.size(); + BlockCacheImpl* cache = cache_vec_[hash]; + Status s = cache->NewRandomAccessFile(fname, fsize, result); + if (!s.ok()) { + Log("[block_cache %s] open file read fail: %s, hash: %u, status: %s, fsize %lu\n", + cache->WorkPath().c_str(), fname.c_str(), hash, s.ToString().c_str(), fsize); + } + return s; +} + +class BlockCacheWriteBuffer { +public: + BlockCacheWriteBuffer(const std::string& path, + const std::string& file, + int block_size) + : offset_(0), + block_size_(block_size), + block_idx_(0), + tmp_storage_(NULL), + path_(path), + file_(file) { + } + + ~BlockCacheWriteBuffer() { + assert(block_list_.size() == 0); + } + + uint32_t NumFullBlock() { // use for BGFlush + MutexLock l(&mu_); + if (block_list_.size() == 0) { + return 0; + } else if ((block_list_.back())->size() < block_size_) { + return block_list_.size() - 1; + } else { + return block_list_.size(); + } + } + + Status Append(const Slice& data) { + MutexLock l(&mu_); + if (tmp_storage_ == NULL) { + tmp_storage_ = new std::string(); + block_list_.push_back(tmp_storage_); + } + uint32_t begin = offset_ / block_size_; + uint32_t end = (offset_ + data.size()) / block_size_; + if (begin == end) { // in the same block + tmp_storage_->append(data.data(), data.size()); + } else { + uint32_t tmp_size = block_size_ - (offset_ % block_size_); + tmp_storage_->append(data.data(), tmp_size); + assert(tmp_storage_->size() == block_size_); + Slice buf(data.data() + tmp_size, data.size() - tmp_size); + for (uint32_t i = begin + 1; i <= end; ++i) { + tmp_storage_ = new std::string(); + block_list_.push_back(tmp_storage_); + if (i < end) { + tmp_storage_->append(buf.data(), block_size_); + buf.remove_prefix(block_size_); + } else { // last block + tmp_storage_->append(buf.data(), buf.size()); + buf.remove_prefix(buf.size()); + } + //Log("[%s] add tmp_storage %s: offset: %lu, buf_size: %lu, idx %u\n", + // path_.c_str(), + // file_.c_str(), + // offset_, + // buf.size(), i); + } + } + offset_ += data.size(); + //Log("[%s] add record: %s, begin: %u, end: %u, offset: %lu, data_size: %lu, block_size: %u\n", + // path_.c_str(), + // file_.c_str(), + // begin, end, + // offset_ - data.size() , data.size(), block_size_); + return Status::OK(); + } + + std::string* PopFrontBlock(uint64_t* block_idx) { + MutexLock l(&mu_); + if (block_list_.size() == 0) { + return NULL; + } + std::string* block = block_list_.front(); + assert(block->size() <= block_size_); + if (block->size() != block_size_) { + return NULL; + } + block_list_.pop_front(); + *block_idx = block_idx_; + block_idx_++; + return block; + } + + std::string* PopBackBlock(uint64_t* block_idx) { + MutexLock l(&mu_); + if (block_list_.size() == 0) { + return NULL; + } + std::string* block = block_list_.back(); + block_list_.pop_back(); + *block_idx = offset_ / block_size_; + return block; + } + + void ReleaseBlock(std::string* block) { + delete block; + } + +private: + port::Mutex mu_; + uint64_t offset_; + uint32_t block_size_; + uint64_t block_idx_; + std::string* tmp_storage_; + std::list block_list_; // kBlockSize + std::string path_; + std::string file_; +}; + +class BlockCacheWritableFile : public WritableFile { +public: + BlockCacheWritableFile(BlockCacheImpl* c, const std::string& fname, Status* s) + : cache_(c), + bg_cv_(&mu_), + bg_block_flush_(0), + pending_block_num_(0), + write_buffer_(cache_->WorkPath(), fname, cache_->options_.block_size), + fname_(fname) { // file open + *s = cache_->dfs_env_->NewWritableFile(fname_, &dfs_file_); + if (!s->ok()) { + Log("[%s] dfs open: %s, block_size: %lu, status: %s\n", + cache_->WorkPath().c_str(), + fname.c_str(), + cache_->options_.block_size, + s->ToString().c_str()); + } + bg_status_ = *s; + fid_ = cache_->FileId(fname_); + return; + } + + ~BlockCacheWritableFile() { Close(); } + + Status Append(const Slice& data) { + Status s = dfs_file_->Append(data); + if (!s.ok()) { + Log("[%s] dfs append fail: %s, status: %s\n", + cache_->WorkPath().c_str(), + fname_.c_str(), + s.ToString().c_str()); + return s; + } + write_buffer_.Append(data); + + MutexLock lockgard(&mu_); + MaybeScheduleBGFlush(); + return s; + } + + Status Close() { + Status s, s1; + if (dfs_file_ != NULL) { + s = dfs_file_->Close(); + delete dfs_file_; + dfs_file_ = NULL; + } + + uint64_t block_idx; + std::string* block_data = write_buffer_.PopBackBlock(&block_idx); + if (block_data != NULL) { + s1 = FillCache(block_data, block_idx); + } + + MutexLock lockgard(&mu_); + while (bg_block_flush_ > 0) { + bg_cv_.Wait(); + } + if (bg_status_.ok()) { + bg_status_ = s.ok() ? s1: s; + } + //Log("[%s] end close %s, status %s\n", cache_->WorkPath().c_str(), fname_.c_str(), + // s.ToString().c_str()); + return bg_status_; + } + + Status Flush() { + //Log("[%s] dfs flush: %s\n", cache_->WorkPath().c_str(), fname_.c_str()); + return dfs_file_->Flush(); + } + + Status Sync() { + //Log("[%s] dfs sync: %s\n", cache_->WorkPath().c_str(), fname_.c_str()); + return dfs_file_->Sync(); + } + +private: + void MaybeScheduleBGFlush() { + mu_.AssertHeld(); + //Log("[%s] Maybe schedule BGFlush: %s, bg_block_flush: %u, block_nr: %u\n", + // cache_->WorkPath().c_str(), + // fname_.c_str(), + // bg_block_flush_, + // write_buffer_.NumFullBlock()); + while (bg_block_flush_ < (write_buffer_.NumFullBlock() + pending_block_num_)) { + bg_block_flush_++; + cache_->bg_flush_.Schedule(&BlockCacheWritableFile::BGFlushFunc, this, 10); + } + } + + static void BGFlushFunc(void* arg) { + reinterpret_cast(arg)->BGFlush(); + } + void BGFlush() { + //Log("[%s] Begin BGFlush: %s\n", cache_->WorkPath().c_str(), fname_.c_str()); + Status s; + MutexLock lockgard(&mu_); + uint64_t block_idx; + std::string* block_data = write_buffer_.PopFrontBlock(&block_idx); + if (block_data != NULL) { + pending_block_num_++; + mu_.Unlock(); + + s = FillCache(block_data, block_idx); + mu_.Lock(); + pending_block_num_--; + } + + bg_status_ = bg_status_.ok() ? s: bg_status_; + bg_block_flush_--; + MaybeScheduleBGFlush(); + bg_cv_.Signal(); + return; + } + + Status FillCache(std::string* block_data, uint64_t block_idx) { + Status s; + uint64_t fid = fid_; + CacheBlock* block = NULL; + while ((block = cache_->GetAndAllocBlock(fid, block_idx)) == NULL) { + Log("[%s] fill cache for write %s, fid %lu, block_idx %lu, wait 10ms after retry\n", + cache_->WorkPath().c_str(), fname_.c_str(), + fid, block_idx); + cache_->options_.cache_env->SleepForMicroseconds(10000); + } + + block->mu.Lock(); + block->state = 0; + block->GetDataBlock(cache_->options_.block_size, Slice(*block_data)); + block->mu.Unlock(); + + // Do io without lock + block->s = cache_->LogRecord(block); + if (block->s.ok()) { + block->s = cache_->FillCache(block); + if (block->s.ok()) { + MutexLock l(&block->mu); + block->state = kCacheBlockValid; + } + } + s = cache_->ReleaseBlock(block, true); + write_buffer_.ReleaseBlock(block_data); + return s; + } + +private: + BlockCacheImpl* cache_; + //port::AtomicPointer shutting_down_; + port::Mutex mu_; + port::CondVar bg_cv_; // Signalled when background work finishes + Status bg_status_; + WritableFile* dfs_file_; + // protected by cache_.mu_ + uint32_t bg_block_flush_; + uint32_t pending_block_num_; + BlockCacheWriteBuffer write_buffer_; + std::string fname_; + uint64_t fid_; +}; + +class BlockCacheRandomAccessFile : public RandomAccessFile { +public: + BlockCacheRandomAccessFile(BlockCacheImpl* c, const std::string& fname, + uint64_t fsize, Status* s) + : cache_(c), + fname_(fname), + fsize_(fsize) { + *s = cache_->dfs_env_->NewRandomAccessFile(fname_, &dfs_file_); + //Log("[%s] dfs open for read: %s, block_size: %lu, status: %s\n", + // cache_->WorkPath().c_str(), + // fname.c_str(), + // cache_->options_.block_size, + // s->ToString().c_str()); + + fid_ = cache_->FileId(fname_); + aio_enabled_ = false; + return; + } + + ~BlockCacheRandomAccessFile() { + delete dfs_file_; + } + + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + Status s; + uint64_t begin = offset / cache_->options_.block_size; + uint64_t end = (offset + n) / cache_->options_.block_size; + assert(begin <= end); + uint64_t fid = fid_; + std::vector c_miss; + std::vector c_locked; + std::vector c_valid; + std::vector block_queue; + + //Log("[%s] Begin Pread %s, size %lu, offset %lu, fid %lu, start_block %lu, end_block %lu" + // ", block_size %lu\n", + // cache_->WorkPath().c_str(), fname_.c_str(), n, offset, fid, + // begin, end, cache_->options_.block_size); + + uint64_t start_ts = cache_->options_.cache_env->NowMicros(); + for (uint64_t block_idx = begin; block_idx <= end; ++block_idx) { + uint64_t get_block_ts = cache_->options_.cache_env->NowMicros(); + CacheBlock* block = NULL; + while ((block = cache_->GetAndAllocBlock(fid, block_idx)) == NULL) { + Log("[%s] fill cache for read %s, fid %lu, block_idx %lu, wait 10ms after retry\n", + cache_->WorkPath().c_str(), fname_.c_str(), + fid, block_idx); + cache_->options_.cache_env->SleepForMicroseconds(10000); + } + + block->mu.Lock(); + assert(block->fid == fid && block->block_idx == block_idx); + block->GetDataBlock(cache_->options_.block_size, Slice()); + block_queue.push_back(block); // sort by block_idx + if (!block->Test(kCacheBlockLocked) && + block->Test(kCacheBlockValid)) { + block->Set(kCacheBlockLocked | kCacheBlockCacheRead); + c_valid.push_back(block); + } else if (!block->Test(kCacheBlockLocked)) { + block->Set(kCacheBlockLocked | kCacheBlockDfsRead); + c_miss.push_back(block); + } else { + c_locked.push_back(block); + } + block->mu.Unlock(); + + //Log("[%s] Queue block: %s, refs %u, data_block_refs %lu, alloc %u\n", + // cache_->WorkPath().c_str(), block->ToString().c_str(), + // block->handle->refs, block->data_block_refs, + // block->data_block_alloc); + cache_->stat_->MeasureTime(TERA_BLOCK_CACHE_PREAD_GET_BLOCK, + cache_->options_.cache_env->NowMicros() - get_block_ts); + } + uint64_t queue_ts = cache_->options_.cache_env->NowMicros(); + cache_->stat_->MeasureTime(TERA_BLOCK_CACHE_PREAD_QUEUE, queue_ts - start_ts); + cache_->stat_->MeasureTime(TERA_BLOCK_CACHE_PREAD_BLOCK_NR, end - begin + 1); + + // async read miss data + for (uint32_t i = 0; i < c_miss.size(); ++i) { + CacheBlock* block = c_miss[i]; + AsyncDfsReader* reader = new AsyncDfsReader; + reader->file = const_cast(this); + reader->block = block; + //Log("[%s] pread in miss list, %s\n", + // cache_->WorkPath().c_str(), + // block->ToString().c_str()); + cache_->bg_dfs_read_.Schedule(&BlockCacheRandomAccessFile::AsyncDfsRead, reader, 10); + } + //uint64_t miss_read_sched_ts = cache_->options_.cache_env->NowMicros(); + + // async read valid data + for (uint32_t i = 0; i < c_valid.size(); ++i) { + CacheBlock* block = c_valid[i]; + AsyncCacheReader* reader = new AsyncCacheReader; + reader->file = const_cast(this); + reader->block = block; + //Log("[%s] pread in valid list, %s\n", + // cache_->WorkPath().c_str(), + // block->ToString().c_str()); + if (aio_enabled_) { + AioCacheRead(reader); + } else { + cache_->bg_read_.Schedule(&BlockCacheRandomAccessFile::AsyncCacheRead, reader, 10); + } + } + //uint64_t ssd_read_sched_ts = cache_->options_.cache_env->NowMicros(); + + // wait async cache read done + for (uint32_t i = 0; i < c_valid.size(); ++i) { + CacheBlock* block = c_valid[i]; + block->mu.Lock(); + block->WaitOnClear(kCacheBlockCacheRead); + assert(block->Test(kCacheBlockValid)); + if (!block->s.ok() && s.ok()) { + s = block->s; // degrade read + } + block->Clear(kCacheBlockLocked); + block->cv.SignalAll(); + block->mu.Unlock(); + //Log("[%s] cache read done, %s\n", + // cache_->WorkPath().c_str(), + // block->ToString().c_str()); + } + uint64_t ssd_read_ts = cache_->options_.cache_env->NowMicros(); + cache_->stat_->MeasureTime(TERA_BLOCK_CACHE_PREAD_SSD_READ, ssd_read_ts - queue_ts); + + // wait dfs read done and async cache file + for (uint32_t i = 0; i < c_miss.size(); ++i) { + CacheBlock* block = c_miss[i]; + block->mu.Lock(); + block->WaitOnClear(kCacheBlockDfsRead); + block->Set(kCacheBlockCacheFill); + if (!block->s.ok() && s.ok()) { + s = block->s; // degrade read + } + block->mu.Unlock(); + //Log("[%s] dfs read done, %s\n", + // cache_->WorkPath().c_str(), + // block->ToString().c_str()); + } + uint64_t dfs_read_ts = cache_->options_.cache_env->NowMicros(); + cache_->stat_->MeasureTime(TERA_BLOCK_CACHE_PREAD_DFS_READ, dfs_read_ts - ssd_read_ts); + + for (uint32_t i = 0; i < c_miss.size(); ++i) { + CacheBlock* block = c_miss[i]; + AsyncCacheWriter* writer = new AsyncCacheWriter; + writer->file = const_cast(this); + writer->block = block; + //Log("[%s] pread in miss list(fill cache), %s\n", + // cache_->WorkPath().c_str(), + // block->ToString().c_str()); + cache_->bg_fill_.Schedule(&BlockCacheRandomAccessFile::AsyncCacheWrite, writer, 10); + } + uint64_t ssd_write_sched_ts = cache_->options_.cache_env->NowMicros(); + //cache_->stat_->MeasureTime(TERA_BLOCK_CACHE_PREAD_SSD_WRITE_SCHED, ssd_write_sched_ts - dfs_read_ts); + + for (uint32_t i = 0; i < c_miss.size(); ++i) { // wait cache fill finish + CacheBlock* block = c_miss[i]; + block->mu.Lock(); + block->WaitOnClear(kCacheBlockCacheFill); + if (block->s.ok()) { + block->Set(kCacheBlockValid); + } else if (s.ok()) { + s = block->s; // degrade read + } + block->Clear(kCacheBlockLocked); + block->cv.SignalAll(); + block->mu.Unlock(); + //Log("[%s] cache fill done, %s\n", + // cache_->WorkPath().c_str(), + // block->ToString().c_str()); + } + uint64_t ssd_write_ts = cache_->options_.cache_env->NowMicros(); + cache_->stat_->MeasureTime(TERA_BLOCK_CACHE_PREAD_SSD_WRITE, ssd_write_ts - ssd_write_sched_ts); + + // wait other async read finish + for (uint32_t i = 0; i < c_locked.size(); ++i) { + CacheBlock* block = c_locked[i]; + block->mu.Lock(); + block->WaitOnClear(kCacheBlockLocked); + block->mu.Unlock(); + //Log("[%s] wait locked done, %s\n", + // cache_->WorkPath().c_str(), + // block->ToString().c_str()); + } + uint64_t wait_unlock_ts = cache_->options_.cache_env->NowMicros(); + cache_->stat_->MeasureTime(TERA_BLOCK_CACHE_PREAD_WAIT_UNLOCK, wait_unlock_ts - ssd_write_ts); + + // fill user mem + size_t msize = 0; + for (uint64_t block_idx = begin; block_idx <= end; ++block_idx) { + CacheBlock* block = block_queue[block_idx - begin]; + Slice data_block = block->data_block; + if (block_idx == begin) { + data_block.remove_prefix(offset % cache_->options_.block_size); + } + if (block_idx == end) { + data_block.remove_suffix(cache_->options_.block_size - (n + offset) % cache_->options_.block_size); + } + memcpy(scratch + msize, data_block.data(), data_block.size()); + msize += data_block.size(); + //Log("[%s] Fill user data, %s, fill_offset %lu, fill_size %lu, prefix %lu, suffix %lu, msize %lu, offset %lu\n", + // cache_->WorkPath().c_str(), fname_.c_str(), + // block_idx * cache_->options_.block_size + (block_idx == begin ? offset % cache_->options_.block_size: 0), + // data_block.size(), + // block_idx == begin ? offset % cache_->options_.block_size: 0, + // block_idx == end ? cache_->options_.block_size - (n + offset) % cache_->options_.block_size + // : cache_->options_.block_size, + // msize, offset); + } + assert(msize == n); + *result = Slice(scratch, n); + uint64_t fill_user_data_ts = cache_->options_.cache_env->NowMicros(); + cache_->stat_->MeasureTime(TERA_BLOCK_CACHE_PREAD_FILL_USER_DATA, fill_user_data_ts - wait_unlock_ts); + + for (uint32_t i = 0; i < c_miss.size(); ++i) { + CacheBlock* block = c_miss[i]; + //Log("[%s] wakeup for miss, %s\n", cache_->WorkPath().c_str(), block->ToString().c_str()); + cache_->ReleaseBlock(block, true); + } + for (uint32_t i = 0; i < c_valid.size(); ++i) { + CacheBlock* block = c_valid[i]; + //Log("[%s] wakeup for valid, %s\n", cache_->WorkPath().c_str(), block->ToString().c_str()); + cache_->ReleaseBlock(block, false); + } + for (uint32_t i = 0; i < c_locked.size(); ++i) { + CacheBlock* block = c_locked[i]; + //Log("[%s] wakeup for lock, %s\n", cache_->WorkPath().c_str(), block->ToString().c_str()); + cache_->ReleaseBlock(block, false); + } + uint64_t release_cache_block_ts = cache_->options_.cache_env->NowMicros(); + cache_->stat_->MeasureTime(TERA_BLOCK_CACHE_PREAD_RELEASE_BLOCK, release_cache_block_ts - fill_user_data_ts); + + if (!s.ok()) { + s = dfs_file_->Read(offset, n, result, scratch); + Log("[%s] Pread degrade %s, offset %lu, size %lu, status %s\n", + cache_->WorkPath().c_str(), fname_.c_str(), + offset, n, s.ToString().c_str()); + } + //Log("[%s] Done Pread %s, size %lu, offset %lu, fid %lu, res %lu, status %s, start_block %lu, end_block %lu" + // ", block_size %lu\n", + // cache_->WorkPath().c_str(), fname_.c_str(), n, offset, fid, + // result->size(), s.ToString().c_str(), + // begin, end, cache_->options_.block_size); + return s; + } + +private: + struct AsyncDfsReader { + BlockCacheRandomAccessFile* file; + CacheBlock* block; + }; + static void AsyncDfsRead(void* arg) { + AsyncDfsReader* reader = (AsyncDfsReader*)arg; + reader->file->HandleDfsRead(reader); + delete reader; + return; + } + void HandleDfsRead(AsyncDfsReader* reader) { + Status s; + CacheBlock* block = reader->block; + char* scratch = (char*)(block->data_block.data()); + Slice result; + uint64_t offset = block->block_idx * cache_->options_.block_size; + size_t n = cache_->options_.block_size; + block->s = dfs_file_->Read(offset, n, &result, scratch); + if (!block->s.ok()) { + Log("[%s] dfs read, %s" + ", offset %lu, size %lu, status %s, res %lu\n", + cache_->WorkPath().c_str(), block->ToString().c_str(), + offset, n, + block->s.ToString().c_str(), result.size()); + } + + block->mu.Lock(); + block->Clear(kCacheBlockDfsRead); + block->cv.SignalAll(); + block->mu.Unlock(); + return; + } + + struct AsyncCacheReader { + BlockCacheRandomAccessFile* file; + CacheBlock* block; + + // aio spec + struct aiocb aio_context; + }; + + // use use thread module to enhance sync io + static void AsyncCacheRead(void* arg) { + AsyncCacheReader* reader = (AsyncCacheReader*)arg; + reader->file->HandleCacheRead(reader); + delete reader; + } + void HandleCacheRead(AsyncCacheReader* reader) { + CacheBlock* block = reader->block; + block->s = cache_->ReadCache(block, NULL); + + block->mu.Lock(); + block->Clear(kCacheBlockCacheRead); + block->cv.SignalAll(); + block->mu.Unlock(); + //Log("[%s] async.cacheread signal, %s\n", cache_->WorkPath().c_str(), + // block->ToString().c_str()); + } + + // support posix aio engine + static void AioCacheReadCallback(sigval_t sigval) { // kernel create thread + AsyncCacheReader* reader = (AsyncCacheReader*)sigval.sival_ptr; + reader->file->HandleAioCacheReadCallback(reader); + delete reader; + } + void HandleAioCacheReadCallback(AsyncCacheReader* reader) { + CacheBlock* block = reader->block; + assert(aio_error(&reader->aio_context) == 0); + //while (aio_error(reader->aio_context) == EINPROGRESS); + ssize_t res = aio_return(&reader->aio_context); + block->s = res < 0? Status::Corruption("AioReadCache error") : Status::OK(); + if (!block->s.ok()) { + Log("[%s] aio.cacheread signal, %s\n", cache_->WorkPath().c_str(), + block->ToString().c_str()); + } + + block->mu.Lock(); + block->Clear(kCacheBlockCacheRead); + block->cv.SignalAll(); + block->mu.Unlock(); + } + void AioCacheRead(AsyncCacheReader* reader) const { + // setup sigevent + memset((char*)(&reader->aio_context), 0, sizeof(struct aiocb)); + reader->aio_context.aio_sigevent.sigev_notify = SIGEV_THREAD; + reader->aio_context.aio_sigevent.sigev_notify_function = &BlockCacheRandomAccessFile::AioCacheReadCallback; + reader->aio_context.aio_sigevent.sigev_notify_attributes = NULL; + reader->aio_context.aio_sigevent.sigev_value.sival_ptr = reader; + + cache_->ReadCache(reader->block, &reader->aio_context); + } + + struct AsyncCacheWriter { + BlockCacheRandomAccessFile* file; + CacheBlock* block; + }; + static void AsyncCacheWrite(void* arg) { + AsyncCacheWriter* writer = (AsyncCacheWriter*)arg; + writer->file->HandleCacheWrite(writer); + delete writer; + return; + } + void HandleCacheWrite(AsyncCacheWriter* writer) { + CacheBlock* block = writer->block; + //Log("[%s] cache fill, %s\n", + // cache_->WorkPath().c_str(), + // block->ToString().c_str()); + block->s = cache_->LogRecord(block); + if (block->s.ok()) { + block->s = cache_->FillCache(block); + } + + block->mu.Lock(); + block->Clear(kCacheBlockCacheFill); + block->cv.SignalAll(); + block->mu.Unlock(); + return; + } + +private: + BlockCacheImpl* cache_; + RandomAccessFile* dfs_file_; + std::string fname_; + uint64_t fid_; + uint64_t fsize_; + bool aio_enabled_; +}; + +// t-cache implementation +BlockCacheImpl::BlockCacheImpl(const BlockCacheOptions& options) + : options_(options), + dfs_env_(options.env), + new_fid_(0), + prev_fid_(0), + db_(NULL) { + bg_fill_.SetBackgroundThreads(30); + bg_read_.SetBackgroundThreads(30); + bg_dfs_read_.SetBackgroundThreads(30); + bg_flush_.SetBackgroundThreads(30); + bg_control_.SetBackgroundThreads(2); + stat_ = CreateDBStatistics(); +} + +BlockCacheImpl::~BlockCacheImpl() { + delete stat_; +} + +void BlockCacheImpl::BGControlThreadFunc(void* arg) { + reinterpret_cast(arg)->BGControlThread(); +} + +void BlockCacheImpl::BGControlThread() { + stat_->MeasureTime(TERA_BLOCK_CACHE_EVICT_NR, + tera_block_cache_evict_counter.Clear()); + + Log("[%s] statistics: " + "%s, %s, %s, %s, %s, " + "%s, %s, %s, %s, %s, " + "%s, %s, %s, %s, %s\n", + this->WorkPath().c_str(), + stat_->GetBriefHistogramString(TERA_BLOCK_CACHE_PREAD_QUEUE).c_str(), + stat_->GetBriefHistogramString(TERA_BLOCK_CACHE_PREAD_SSD_READ).c_str(), + stat_->GetBriefHistogramString(TERA_BLOCK_CACHE_PREAD_DFS_READ).c_str(), + stat_->GetBriefHistogramString(TERA_BLOCK_CACHE_PREAD_SSD_WRITE).c_str(), + stat_->GetBriefHistogramString(TERA_BLOCK_CACHE_PREAD_FILL_USER_DATA).c_str(), + + stat_->GetBriefHistogramString(TERA_BLOCK_CACHE_PREAD_RELEASE_BLOCK).c_str(), + stat_->GetBriefHistogramString(TERA_BLOCK_CACHE_LOCKMAP_DS_RELOAD_NR).c_str(), + stat_->GetBriefHistogramString(TERA_BLOCK_CACHE_PREAD_GET_BLOCK).c_str(), + stat_->GetBriefHistogramString(TERA_BLOCK_CACHE_PREAD_BLOCK_NR).c_str(), + stat_->GetBriefHistogramString(TERA_BLOCK_CACHE_GET_DATA_SET).c_str(), + + stat_->GetBriefHistogramString(TERA_BLOCK_CACHE_DS_LRU_LOOKUP).c_str(), + stat_->GetBriefHistogramString(TERA_BLOCK_CACHE_PREAD_WAIT_UNLOCK).c_str(), + stat_->GetBriefHistogramString(TERA_BLOCK_CACHE_ALLOC_FID).c_str(), + stat_->GetBriefHistogramString(TERA_BLOCK_CACHE_GET_FID).c_str(), + stat_->GetBriefHistogramString(TERA_BLOCK_CACHE_EVICT_NR).c_str()); + + Log("[%s] statistics(meta): " + "table_cache: %lf/%lu/%lu, " + "block_cache: %lf/%lu/%lu\n", + this->WorkPath().c_str(), + options_.opts.table_cache->HitRate(true), + options_.opts.table_cache->TableEntries(), + options_.opts.table_cache->ByteSize(), + options_.opts.block_cache->HitRate(true), + options_.opts.block_cache->Entries(), + options_.opts.block_cache->TotalCharge()); + + // resched after 6s + stat_->ClearHistogram(TERA_BLOCK_CACHE_PREAD_QUEUE); + stat_->ClearHistogram(TERA_BLOCK_CACHE_PREAD_SSD_READ); + stat_->ClearHistogram(TERA_BLOCK_CACHE_PREAD_DFS_READ); + stat_->ClearHistogram(TERA_BLOCK_CACHE_PREAD_SSD_WRITE); + stat_->ClearHistogram(TERA_BLOCK_CACHE_PREAD_FILL_USER_DATA); + stat_->ClearHistogram(TERA_BLOCK_CACHE_PREAD_RELEASE_BLOCK); + stat_->ClearHistogram(TERA_BLOCK_CACHE_LOCKMAP_DS_RELOAD_NR); + stat_->ClearHistogram(TERA_BLOCK_CACHE_PREAD_GET_BLOCK); + stat_->ClearHistogram(TERA_BLOCK_CACHE_PREAD_BLOCK_NR); + stat_->ClearHistogram(TERA_BLOCK_CACHE_GET_DATA_SET); + stat_->ClearHistogram(TERA_BLOCK_CACHE_DS_LRU_LOOKUP); + stat_->ClearHistogram(TERA_BLOCK_CACHE_PREAD_WAIT_UNLOCK); + stat_->ClearHistogram(TERA_BLOCK_CACHE_ALLOC_FID); + stat_->ClearHistogram(TERA_BLOCK_CACHE_GET_FID); + stat_->ClearHistogram(TERA_BLOCK_CACHE_EVICT_NR); + bg_control_.Schedule(&BlockCacheImpl::BGControlThreadFunc, this, 10, 6000); +} + +Status BlockCacheImpl::NewWritableFile(const std::string& fname, + WritableFile** result) { + Status s; + BlockCacheWritableFile* file = new BlockCacheWritableFile(this, fname, &s); + *result = NULL; + if (s.ok()) { + *result = (WritableFile*)file; + } + return s; +} + +Status BlockCacheImpl::NewRandomAccessFile(const std::string& fname, + uint64_t fsize, + RandomAccessFile** result) { + Status s; + BlockCacheRandomAccessFile* file = new BlockCacheRandomAccessFile(this, fname, fsize, &s); + *result = NULL; + if (s.ok()) { + *result = (RandomAccessFile*)file; + } + return s; +} + +void BlockCacheImpl::BlockDeleter(const Slice& key, void* v) { + CacheBlock* block = (CacheBlock*)v; + //Log("Evict blockcache: %s\n", block->ToString().c_str()); + delete block; + tera_block_cache_evict_counter.Inc(); + return; +} + +// if lock succ, put lock_val, else get newer value +Status BlockCacheImpl::LockAndPut(LockContent& lc) { + mu_.AssertHeld(); + Status s; + std::string key; + if ((key = lc.Encode()) == "") { + return Status::NotSupported("key type error"); + } + //Log("[%s] trylock key: %s\n", + // this->WorkPath().c_str(), + // key.c_str()); + + Waiter* w = NULL; + LockKeyMap::iterator it = lock_key_.find(key); + if (it != lock_key_.end()) { + w = it->second; + w->wait_num ++; + mu_.Unlock(); + w->Wait(); + + s = GetContentAfterWait(lc); + mu_.Lock(); + if (--w->wait_num == 0) { + // last thread wait for open + lock_key_.erase(key); + //Log("[%s] wait done %s, delete cv\n", + // this->WorkPath().c_str(), + // key.c_str()); + delete w; + } else { + //Log("[%s] wait done %s, not last\n", + // this->WorkPath().c_str(), + // key.c_str()); + } + } else { + w = new Waiter; + w->wait_num = 1; + lock_key_[key] = w; + mu_.Unlock(); + + s = PutContentAfterLock(lc); + mu_.Lock(); + if (--w->wait_num == 0) { + lock_key_.erase(key); + //Log("[%s] put done %s, no wait thread\n", + // this->WorkPath().c_str(), + // key.c_str()); + delete w; + } else { + mu_.Unlock(); + //Log("[%s] put done %s, signal all wait thread\n", + // this->WorkPath().c_str(), + // key.c_str()); + w->SignalAll(); + + mu_.Lock(); + } + } + return s; +} + +Status BlockCacheImpl::GetContentAfterWait(LockContent& lc) { + Status s; + std::string key = lc.Encode(); + + if (lc.type == kDBKey) { + ReadOptions r_opts; + s = db_->Get(r_opts, key, lc.db_val); + //Log("[%s] get lock key: %s, val: %s, status: %s\n", + // this->WorkPath().c_str(), + // key.c_str(), + // lc.db_val->c_str(), + // s.ToString().c_str()); + } else if (lc.type == kDataSetKey) { + std::string ds_key; + PutFixed64(&ds_key, lc.sid); + LRUHandle* ds_handle = (LRUHandle*)data_set_cache_->Lookup(ds_key); + lc.data_set = reinterpret_cast(data_set_cache_->Value((Cache::Handle*)ds_handle)); + assert(ds_handle == lc.data_set->h); + //Log("[%s] get dataset sid: %lu\n", + // this->WorkPath().c_str(), + // lc.sid); + } + return s; +} + +Status BlockCacheImpl::PutContentAfterLock(LockContent& lc) { + Status s; + std::string key = lc.Encode(); + + if (lc.type == kDBKey) { + WriteOptions w_opts; + s = db_->Put(w_opts, key, lc.db_lock_val); + if (s.ok()) { + lc.db_val->append(lc.db_lock_val.data(), lc.db_lock_val.size()); + } + //Log("[%s] Insert db key : %s, val %s, status %s\n", + // this->WorkPath().c_str(), + // lc.KeyToString().c_str(), + // lc.ValToString().c_str(), + // s.ToString().c_str()); + } else if (lc.type == kDeleteDBKey) { + WriteOptions w_opts; + s = db_->Delete(w_opts, key); + //Log("[%s] Delete db key : %s, val %s, status %s\n", + // this->WorkPath().c_str(), + // lc.KeyToString().c_str(), + // lc.ValToString().c_str(), + // s.ToString().c_str()); + } else if (lc.type == kDataSetKey) { // cannot double insert + std::string ds_key; + PutFixed64(&ds_key, lc.sid); + LRUHandle* ds_handle = (LRUHandle*)data_set_cache_->Lookup(ds_key); + if (ds_handle != NULL) { + lc.data_set = reinterpret_cast(data_set_cache_->Value((Cache::Handle*)ds_handle)); + assert(ds_handle == lc.data_set->h); + } else { + s = ReloadDataSet(lc); + } + } + return s; +} + +Status BlockCacheImpl::ReloadDataSet(LockContent& lc) { + Status s; + std::string key = lc.Encode(); + + lc.data_set = new DataSet; + lc.data_set->cache = New2QCache((options_.dataset_size / options_.block_size) + 1);// number of blocks in DS + std::string file = options_.cache_dir + "/" + Uint64ToString(lc.sid); + lc.data_set->fd = open(file.c_str(), O_RDWR | O_CREAT, 0644); + assert(lc.data_set->fd > 0); + Log("[%s] New DataSet %s, file: %s, nr_block: %lu, fd: %d\n", + this->WorkPath().c_str(), + lc.KeyToString().c_str(), + file.c_str(), (options_.dataset_size / options_.block_size) + 1, + lc.data_set->fd); + + // reload hash lru + uint64_t total_items = 0; + ReadOptions s_opts; + leveldb::Iterator* db_it = db_->NewIterator(s_opts); + for (db_it->Seek(key); + db_it->Valid() && db_it->key().starts_with("DS#"); + db_it->Next()) { + Slice lkey = db_it->key(); + uint64_t sid, cbi; + lkey.remove_prefix(3);// lkey = DS#, sid, cbi + sid = DecodeFixed64(lkey.data()); + lkey.remove_prefix(sizeof(uint64_t)); + cbi = DecodeFixed64(lkey.data()); + //Slice lval = db_it->value(); + if (sid != lc.sid) { + break; + } + total_items++; + + CacheBlock* block = new CacheBlock; + block->DecodeFrom(db_it->value()); // get fid and block_idx + std::string hkey; + PutFixed64(&hkey, block->fid); + PutFixed64(&hkey, block->block_idx); + block->sid = sid; + block->cache_block_idx = cbi; + block->state = (block->Test(kCacheBlockValid)) ? kCacheBlockValid : 0; + //Log("[%s] Recovery %s, insert cacheblock into 2QLru, %s\n", + // this->WorkPath().c_str(), + // lc.KeyToString().c_str(), + // block->ToString().c_str()); + LRUHandle* handle = (LRUHandle*)(lc.data_set->cache->Insert(hkey, block, cbi, &BlockCacheImpl::BlockDeleter)); + assert((uint64_t)(lc.data_set->cache->Value((Cache::Handle*)handle)) == (uint64_t)block); + assert(handle->cache_id == block->cache_block_idx); + block->handle = handle; + lc.data_set->cache->Release((Cache::Handle*)handle); + } + delete db_it; + stat_->MeasureTime(TERA_BLOCK_CACHE_LOCKMAP_DS_RELOAD_NR, total_items); + + std::string ds_key; + PutFixed64(&ds_key, lc.sid); + LRUHandle* ds_handle = (LRUHandle*)data_set_cache_->Insert(ds_key, lc.data_set, 1, NULL); + assert(ds_handle != NULL); + lc.data_set->h = ds_handle; + return s; +} + +const std::string& BlockCacheImpl::WorkPath() { + return work_path_; +} + +Status BlockCacheImpl::LoadCache() { + // open meta file + work_path_ = options_.cache_dir; + std::string dbname = options_.cache_dir + "/meta"; + options_.opts.env = options_.cache_env; // local write + options_.opts.filter_policy = NewBloomFilterPolicy(10); + options_.opts.block_cache = leveldb::NewLRUCache(options_.meta_block_cache_size * 1024UL * 1024); + options_.opts.table_cache = new leveldb::TableCache(options_.meta_table_cache_size * 1024UL * 1024); + options_.opts.write_buffer_size = options_.write_buffer_size; + options_.opts.info_log = Logger::DefaultLogger(); + Log("[block_cache %s] open meta db: block_cache: %lu, table_cache: %lu\n", + dbname.c_str(), + options_.meta_block_cache_size, + options_.meta_table_cache_size); + Status s = DB::Open(options_.opts, dbname, &db_); + assert(s.ok()); + data_set_cache_ = leveldb::NewLRUCache(128 * options_.dataset_num + 1); + + // recover fid + std::string key = "FID#"; + std::string val; + ReadOptions r_opts; + s = db_->Get(r_opts, key, &val); + if (!s.ok()) { + prev_fid_ = 0; + } else { + prev_fid_ = DecodeFixed64(val.c_str()); + } + new_fid_ = prev_fid_ + options_.fid_batch_num; + Log("[block_cache %s]: reuse block cache: prev_fid: %lu, new_fid: %lu\n", + dbname.c_str(), prev_fid_, new_fid_); + + bg_control_.Schedule(&BlockCacheImpl::BGControlThreadFunc, this, 10, 6000); + s = Status::OK(); + return s; +} + +Status BlockCacheImpl::FillCache(CacheBlock* block) { + uint64_t cache_block_idx = block->cache_block_idx; + DataSet* ds = reinterpret_cast(data_set_cache_->Value((Cache::Handle*)block->data_set_handle)); + int fd = ds->fd; + + // do io without lock + ssize_t res = pwrite(fd, block->data_block.data(), block->data_block.size(), + cache_block_idx * options_.block_size); + + if (res < 0) { + Log("[%s] cache fill: sid %lu, dataset.fd %d, datablock size %lu, cb_idx %lu, %s, res %ld\n", + this->WorkPath().c_str(), block->sid, fd, block->data_block.size(), + cache_block_idx, + block->ToString().c_str(), + res); + return Status::Corruption("FillCache error"); + } + return Status::OK(); +} + +Status BlockCacheImpl::ReadCache(CacheBlock* block, struct aiocb* aio_context) { + uint64_t cache_block_idx = block->cache_block_idx; + DataSet* ds = reinterpret_cast(data_set_cache_->Value((Cache::Handle*)block->data_set_handle)); + int fd = ds->fd; + + // do io without lock + ssize_t res = 0; + if (aio_context != NULL) { // support aio engine + aio_context->aio_fildes = fd; + aio_context->aio_buf = (char*)block->data_block.data(); + aio_context->aio_nbytes = block->data_block.size(); + aio_context->aio_offset = cache_block_idx * options_.block_size; + res = aio_read(aio_context); + } else { + res = pread(fd, (char*)block->data_block.data(), block->data_block.size(), + cache_block_idx * options_.block_size); + } + + if (res < 0) { + Log("[%s] cache read: sid %lu, dataset.fd %d, datablock size %lu, cb_idx %lu, %s, res %ld\n", + this->WorkPath().c_str(), block->sid, fd, block->data_block.size(), + cache_block_idx, + block->ToString().c_str(), + res); + return Status::Corruption("ReadCache error"); + } + return Status::OK(); +} + +uint64_t BlockCacheImpl::AllocFileId() { // no more than fid_batch_num + mu_.AssertHeld(); + uint64_t start_ts = options_.cache_env->NowMicros(); + uint64_t fid = ++new_fid_; + while (new_fid_ - prev_fid_ >= options_.fid_batch_num) { + std::string key = "FID#"; + std::string lock_val; + PutFixed64(&lock_val, new_fid_); + std::string val; + + LockContent lc; + lc.type = kDBKey; + lc.db_lock_key = key; + lc.db_lock_val = lock_val; + lc.db_val = &val; + Status s = LockAndPut(lc); + if (s.ok()) { + prev_fid_ = DecodeFixed64(val.c_str()); + } + //Log("[%s] alloc fid: key %s, new_fid: %lu, prev_fid: %lu\n", + // this->WorkPath().c_str(), + // key.c_str(), + // new_fid_, + // prev_fid_); + } + stat_->MeasureTime(TERA_BLOCK_CACHE_ALLOC_FID, + options_.cache_env->NowMicros() - start_ts); + return fid; +} + +uint64_t BlockCacheImpl::FileId(const std::string& fname) { + uint64_t fid = 0; + std::string key = "FNAME#" + fname; + uint64_t start_ts = options_.cache_env->NowMicros(); + ReadOptions r_opts; + std::string val; + + Status s = db_->Get(r_opts, key, &val); + if (!s.ok()) { // not exist + MutexLock l(&mu_); + fid = AllocFileId(); + std::string v; + PutFixed64(&val, fid); + + LockContent lc; + lc.type = kDBKey; + lc.db_lock_key = key; + lc.db_lock_val = val; + lc.db_val = &v; + //Log("[%s] alloc fid: %lu, key: %s", + // this->WorkPath().c_str(), + // fid, key.c_str()); + s = LockAndPut(lc); + assert(s.ok()); + fid = DecodeFixed64(v.c_str()); + } else { // fid in cache + fid = DecodeFixed64(val.c_str()); + } + + //Log("[%s] Fid: %lu, fname: %s\n", + // this->WorkPath().c_str(), + // fid, fname.c_str()); + stat_->MeasureTime(TERA_BLOCK_CACHE_GET_FID, + options_.cache_env->NowMicros() - start_ts); + return fid; +} + +Status BlockCacheImpl::DeleteFile(const std::string& fname) { + Status s; + std::string key = "FNAME#" + fname; + ReadOptions r_opts; + std::string val; + //s = db_->Get(r_opts, key, &val); + //if (!s.ok()) { // not exist + { + MutexLock l(&mu_); + LockContent lc; + lc.type = kDeleteDBKey; + lc.db_lock_key = key; + s = LockAndPut(lc); + } + return s; +} + +DataSet* BlockCacheImpl::GetDataSet(uint64_t sid) { + std::string key; + PutFixed64(&key, sid); + DataSet* set = NULL; + uint64_t start_ts = options_.cache_env->NowMicros(); + + LRUHandle* h = (LRUHandle*)data_set_cache_->Lookup(key); + if (h == NULL) { + MutexLock l(&mu_); + LockContent lc; + lc.type = kDataSetKey; + lc.sid = sid; + lc.data_set = NULL; + Status s = LockAndPut(lc); + set = lc.data_set; + } else { + //Log("[%s] get dataset from memcache, sid %lu\n", + // this->WorkPath().c_str(), sid); + set = reinterpret_cast(data_set_cache_->Value((Cache::Handle*)h)); + assert(set->h == h); + } + stat_->MeasureTime(TERA_BLOCK_CACHE_GET_DATA_SET, + options_.cache_env->NowMicros() - start_ts); + return set; +} + +CacheBlock* BlockCacheImpl::GetAndAllocBlock(uint64_t fid, uint64_t block_idx) { + std::string key; + PutFixed64(&key, fid); + PutFixed64(&key, block_idx); + uint32_t hash = Hash(key.c_str(), key.size(), 7); + uint64_t sid = hash % options_.dataset_num; + + //Log("[%s] alloc block, try get dataset, fid: %lu, block_idx: %lu, hash: %u, sid %lu, dataset_num: %lu\n", + // this->WorkPath().c_str(), fid, block_idx, hash, sid, options_.dataset_num); + CacheBlock* block = NULL; + DataSet* ds = GetDataSet(sid); // get and alloc ds + Cache* cache = ds->cache; + + uint64_t start_ts = options_.cache_env->NowMicros(); + ds->mu.Lock(); + LRUHandle* h = (LRUHandle*)cache->Lookup(key); + if (h == NULL) { + block = new CacheBlock; + block->fid = fid; + block->block_idx = block_idx; + block->sid = sid; + h = (LRUHandle*)cache->Insert(key, block, 0xffffffffffffffff, &BlockCacheImpl::BlockDeleter); + if (h != NULL) { + assert((uint64_t)(cache->Value((Cache::Handle*)h)) == (uint64_t)block); + block->cache_block_idx = h->cache_id; + block->handle = h; + block->data_set_handle = ds->h; + //Log("[%s] Alloc Block: %s, sid %lu, fid %lu, block_idx %lu, hash %u, usage: %lu/%lu\n", + // this->WorkPath().c_str(), + // block->ToString().c_str(), + // sid, fid, block_idx, hash, + // cache->TotalCharge(), + // options_.dataset_size / options_.block_size + 1); + } else { + delete block; + block = NULL; + assert(0); + } + } else { + block = reinterpret_cast(cache->Value((Cache::Handle*)h)); + block->data_set_handle = block->data_set_handle == NULL? ds->h: block->data_set_handle; + } + ds->mu.Unlock(); + + data_set_cache_->Release((Cache::Handle*)ds->h); + stat_->MeasureTime(TERA_BLOCK_CACHE_DS_LRU_LOOKUP, + options_.cache_env->NowMicros() - start_ts); + return block; +} + +Status BlockCacheImpl::LogRecord(CacheBlock* block) { + std::string key = "DS#"; + PutFixed64(&key, block->sid); + PutFixed64(&key, block->cache_block_idx); + leveldb::WriteBatch batch; + batch.Put(key, block->Encode()); + return db_->Write(leveldb::WriteOptions(), &batch); +} + +Status BlockCacheImpl::ReleaseBlock(CacheBlock* block, bool need_sync) { + Status s; + if (need_sync) { // TODO: dump meta into memtable + s = LogRecord(block); + } + + block->mu.Lock(); + block->ReleaseDataBlock(); + block->s = Status::OK(); // clear io status + block->cv.SignalAll(); + block->mu.Unlock(); + + //Log("[%s] release block: %s\n", this->WorkPath().c_str(), block->ToString().c_str()); + LRUHandle* h = block->handle; + DataSet* ds = reinterpret_cast(data_set_cache_->Value((Cache::Handle*)block->data_set_handle)); + ds->cache->Release((Cache::Handle*)h); + return s; +} + +} // namespace leveldb + diff --git a/src/leveldb/util/cache.cc b/src/leveldb/util/cache.cc index 6eab478a1..b7c7ca4e0 100644 --- a/src/leveldb/util/cache.cc +++ b/src/leveldb/util/cache.cc @@ -25,31 +25,6 @@ namespace { // LRU cache implementation -// An entry is a variable length heap-allocated structure. Entries -// are kept in a circular doubly linked list ordered by access time. -struct LRUHandle { - void* value; - void (*deleter)(const Slice&, void* value); - LRUHandle* next_hash; - LRUHandle* next; - LRUHandle* prev; - size_t charge; // TODO(opt): Only allow uint32_t? - size_t key_length; - uint32_t refs; - uint32_t hash; // Hash of key(); used for fast sharding and comparisons - char key_data[1]; // Beginning of key - - Slice key() const { - // For cheaper lookups, we allow a temporary Handle object - // to store a pointer to a key in "value". - if (next == this) { - return *(reinterpret_cast(value)); - } else { - return Slice(key_data, key_length); - } - } -}; - // We provide our own simple hash table since it removes a whole bunch // of porting hacks and is also faster than some of the built-in hash // table implementations in some of the compiler/runtime combinations @@ -286,6 +261,182 @@ size_t LRUCache::TotalCharge() { return usage_; } +class LRU2QCache: public Cache { + public: + explicit LRU2QCache(size_t capacity) + : capacity_(capacity), + usage_(0), + max_cache_id_(0) { + // Make empty circular linked list + lru_.next = &lru_; + lru_.prev = &lru_; + } + + ~LRU2QCache() {} + + // Like Cache methods, but with an extra "hash" parameter. + // Notice: insert if absent,if exist, return the old one. + Cache::Handle* Insert(const Slice& key, void* value, size_t cache_id, + void (*deleter)(const Slice& key, void* value)) { + const uint32_t hash = HashSlice(key); + MutexLock l(&mutex_); + LRUHandle* e = NULL; + //e = (LRUHandle*)DoLookup(key, hash); + //if (e != NULL) { + // assert(0); + // return reinterpret_cast(e); + //} + + if (usage_ < capacity_) { // cache not full + e = reinterpret_cast( + malloc(sizeof(LRUHandle)-1 + key.size())); + e->value = value; + e->deleter = deleter; + e->charge = 1; + e->key_length = key.size(); + e->hash = hash; + e->refs = 2; // One from LRUCache, one for the returned handle + e->cache_id = cache_id == 0xffffffffffffffff ? usage_: cache_id; + memcpy(e->key_data, key.data(), key.size()); + max_cache_id_ = max_cache_id_ < e->cache_id ? e->cache_id : max_cache_id_; + + LRU_Append(e); + assert(table_.Insert(e) == NULL); + usage_++; + return reinterpret_cast(e); + } + assert(max_cache_id_ + 1 == usage_); + assert(usage_ == capacity_); + //fprintf(stderr, "%lu, usage %lu, capacity %lu\n", (uint64_t)this, usage_, capacity_); + + // cache full, reuse item + LRUHandle* old = lru_.next; + while (old != &lru_) { + if (old->refs > 1) { + old = old->next; + continue; + } + e = reinterpret_cast( + malloc(sizeof(LRUHandle)-1 + key.size())); + e->value = value; + e->deleter = deleter; + e->charge = 1; + e->key_length = key.size(); + e->hash = hash; + e->refs = 2; // One from LRUCache, one for the returned handle + e->cache_id = old->cache_id; + memcpy(e->key_data, key.data(), key.size()); + + LRU_Remove(old); + table_.Remove(old->key(), old->hash); + Unref(old); + + LRU_Append(e); + assert(table_.Insert(e) == NULL); + usage_++; + return reinterpret_cast(e); + } + return NULL; + } + + Cache::Handle* Lookup(const Slice& key) { + const uint32_t hash = HashSlice(key); + MutexLock l(&mutex_); + return DoLookup(key, hash); + } + + void Erase(const Slice& key) { + const uint32_t hash = HashSlice(key); + MutexLock l(&mutex_); + LRUHandle* e = table_.Remove(key, hash); + if (e != NULL) { + LRU_Remove(e); + Unref(e); + } + } + + void Release(Cache::Handle* handle) { + MutexLock l(&mutex_); + Unref(reinterpret_cast(handle)); + } + + void* Value(Cache::Handle* handle) { + return reinterpret_cast(handle)->value; + } + + uint64_t NewId() { + return 0; + } + + double HitRate(bool force_clear = false) { + return 99.9999; + } + + size_t Entries() { + MutexLock l(&mutex_); + return usage_; + } + + size_t TotalCharge() { + MutexLock l(&mutex_); + return usage_; + } + + private: + Cache::Handle* DoLookup(const Slice& key, uint32_t hash) { + LRUHandle* e = table_.Lookup(key, hash); + if (e != NULL) { + e->refs++; + LRU_Remove(e); + LRU_Append(e); + } + return reinterpret_cast(e); + } + + void LRU_Remove(LRUHandle* e) { + e->next->prev = e->prev; + e->prev->next = e->next; + } + + void LRU_Append(LRUHandle* e) { + // Make "e" newest entry by inserting just before lru_ + e->next = &lru_; + e->prev = lru_.prev; + e->prev->next = e; + e->next->prev = e; + } + + void Unref(LRUHandle* e) { + assert(e->refs > 0); + e->refs--; + if (e->refs <= 0) { + usage_ -= e->charge; + (*e->deleter)(e->key(), e->value); + free(e); + } + } + + inline uint32_t HashSlice(const Slice& s) { + return Hash(s.data(), s.size(), 0); + } + + // Initialized before use. + size_t capacity_; + + // mutex_ protects the following state. + port::Mutex mutex_; + size_t usage_; + uint64_t max_cache_id_; + + // Dummy head of LRU list. + // lru.prev is newest entry, lru.next is oldest entry. + //LRUHandle hot_lru_; + //LRUHandle cold_lru_; + LRUHandle lru_; + + HandleTable table_; +}; + static const int kNumShardBits = 4; static const int kNumShards = 1 << kNumShardBits; @@ -382,4 +533,8 @@ Cache* NewLRUCache(size_t capacity) { return new ShardedLRUCache(capacity); } +Cache* New2QCache(size_t capacity) { + return new LRU2QCache(capacity); +} + } // namespace leveldb diff --git a/src/leveldb/util/coding_test.cc b/src/leveldb/util/coding_test.cc index fc8fbf5c9..17848377b 100644 --- a/src/leveldb/util/coding_test.cc +++ b/src/leveldb/util/coding_test.cc @@ -219,6 +219,17 @@ TEST(Coding, PutLG_ugly) { ASSERT_EQ(a_slice.ToString(), b_slice.ToString()); } +TEST(Coding, PutFixed64Cmp) { + std::string sa, sb; + PutFixed64(&sa, 100); + PutFixed64(&sb, 50); + ASSERT_TRUE(sa > sb); + uint64_t a = DecodeFixed64(sa.c_str()); + uint64_t b = DecodeFixed64(sb.c_str()); + ASSERT_TRUE(a == 100); + ASSERT_TRUE(b == 50); +} + } // namespace leveldb int main(int argc, char** argv) { diff --git a/src/leveldb/util/statistics.cc b/src/leveldb/util/statistics.cc new file mode 100644 index 000000000..130b06311 --- /dev/null +++ b/src/leveldb/util/statistics.cc @@ -0,0 +1,114 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "leveldb/statistics.h" +#include +#include "util/histogram.h" +#include "../utils/counter.h" + +namespace leveldb { + +class StatisticsImpl : public Statistics { +public: + StatisticsImpl() {} + + ~StatisticsImpl() {} + + virtual int64_t GetTickerCount(uint32_t ticker_type) { + return counter_[ticker_type].Get(); + } + + virtual void RecordTick(uint32_t ticker_type, uint64_t count = 0) { + counter_[ticker_type].Add(count); + } + + virtual void SetTickerCount(uint32_t ticker_type, uint64_t count) { + counter_[ticker_type].Set(count); + } + + virtual void MeasureTime(uint32_t type, uint64_t time) { + hist_[type].Add(time); + } + + virtual void GetHistogramData(uint32_t type, + HistogramData* const data) { + data->median = hist_[type].Median(); + data->percentile95 = hist_[type].Percentile(95); + data->percentile99 = hist_[type].Percentile(99); + data->average = hist_[type].Average(); + data->standard_deviation = hist_[type].StandardDeviation(); + } + + virtual std::string GetHistogramString(uint32_t type) const { + return hist_[type].ToString(); + } + + virtual std::string GetBriefHistogramString(uint32_t type) { + assert(HistogramsNameMap[type].first == type); + + std::string res; + char buffer[200]; + HistogramData hData; + GetHistogramData(type, &hData); + snprintf(buffer, + 200, + "%s :=> %f(%f)", + HistogramsNameMap[type].second.c_str(), + hData.average, + hData.percentile99 - hData.median); + res.append(buffer); + res.shrink_to_fit(); + return res; + } + + void ClearHistogram(uint32_t type) { + hist_[type].Clear(); + } + + // String representation of the statistic object. + virtual std::string ToString() { + std::string res; + res.reserve(20000); + for (uint32_t i = 0; i < TickersNameMap.size(); i++) { + char buffer[200]; + snprintf(buffer, 200, "%s COUNT : %lu\n", + TickersNameMap[i].second.c_str(), GetTickerCount(TickersNameMap[i].first)); + res.append(buffer); + } + for (uint32_t i = 0; i < HistogramsNameMap.size(); i++) { + char buffer[200]; + HistogramData hData; + GetHistogramData(HistogramsNameMap[i].first, &hData); + snprintf(buffer, + 200, + "%s statistics Percentiles :=> 50 : %f 95 : %f 99 : %f\n", + HistogramsNameMap[i].second.c_str(), + hData.median, + hData.percentile95, + hData.percentile99); + res.append(buffer); + } + res.shrink_to_fit(); + return res; + } + + void ClearAll() { + for (uint32_t i = 0; i < TICKER_ENUM_MAX; i++) { + counter_[i].Clear(); + } + for (uint32_t i = 0; i < HISTOGRAM_ENUM_MAX; i++) { + hist_[i].Clear(); + } + } + +private: + tera::Counter counter_[TICKER_ENUM_MAX]; + Histogram hist_[HISTOGRAM_ENUM_MAX]; +}; + +Statistics* CreateDBStatistics() { + return new StatisticsImpl; +} + +} // namespace leveldb diff --git a/src/sdk/sdk_zk.cc b/src/sdk/sdk_zk.cc index e08bb6c9b..5f7b8c8f6 100644 --- a/src/sdk/sdk_zk.cc +++ b/src/sdk/sdk_zk.cc @@ -60,9 +60,6 @@ std::string ClusterFinder::ClusterId() { std::string name = Name(); std::string authority = Authority(); std::string path = Path(); - if (name.empty() || authority.empty() || path.empty()) { - LOG(FATAL) << "cluster name/authority/path must be non-empty"; - } std::string cluster_id = name + "://" + authority; if (path[0] != '/') { cluster_id += "/"; diff --git a/src/tabletnode/remote_tabletnode.cc b/src/tabletnode/remote_tabletnode.cc index 2d95a0e5a..a59061369 100644 --- a/src/tabletnode/remote_tabletnode.cc +++ b/src/tabletnode/remote_tabletnode.cc @@ -322,7 +322,7 @@ void RemoteTabletNode::DoReadTablet(google::protobuf::RpcController* controller, int64_t read_timeout = request->client_timeout_ms() * 1000; // ms -> us int64_t detal = get_micros() - start_micros; if (detal > read_timeout) { - VLOG(5) << "timeout, drop read request for:" << request->tablet_name() + VLOG(8) << "timeout, drop read request for:" << request->tablet_name() << ", detal(in us):" << detal << ", read_timeout(in us):" << read_timeout; is_read_timeout = true; diff --git a/src/tabletnode/tabletnode_impl.cc b/src/tabletnode/tabletnode_impl.cc index c472b9732..ff65562a3 100644 --- a/src/tabletnode/tabletnode_impl.cc +++ b/src/tabletnode/tabletnode_impl.cc @@ -14,9 +14,11 @@ #include "db/filename.h" #include "db/table_cache.h" +#include "common/base/string_ext.h" #include "common/thread.h" #include "io/io_utils.h" #include "io/utils_leveldb.h" +#include "leveldb/block_cache.h" #include "leveldb/cache.h" #include "leveldb/env_cache.h" #include "leveldb/env_dfs.h" @@ -68,7 +70,7 @@ DECLARE_string(tera_tabletnode_path_prefix); // cache-related DECLARE_int32(tera_memenv_block_cache_size); -DECLARE_bool(tera_tabletnode_cache_enabled); +DECLARE_bool(tera_tabletnode_block_cache_enabled); DECLARE_string(tera_tabletnode_cache_paths); DECLARE_int32(tera_tabletnode_cache_block_size); DECLARE_string(tera_tabletnode_cache_name); @@ -150,11 +152,7 @@ TabletNodeImpl::TabletNodeImpl() sysinfo_.SetProcessStartTime(get_micros()); } -TabletNodeImpl::~TabletNodeImpl() { - if (FLAGS_tera_tabletnode_cache_enabled) { - leveldb::ThreeLevelCacheEnv::RemoveCachePaths(); - } -} +TabletNodeImpl::~TabletNodeImpl() {} bool TabletNodeImpl::Init() { if (FLAGS_tera_zk_enabled) { @@ -179,32 +177,32 @@ bool TabletNodeImpl::Init() { } void TabletNodeImpl::InitCacheSystem() { - if (!FLAGS_tera_tabletnode_cache_enabled) { - // compitable with legacy FlashEnv - leveldb::FlashEnv* flash_env = (leveldb::FlashEnv*)io::LeveldbFlashEnv(); - flash_env->SetFlashPath(FLAGS_tera_tabletnode_cache_paths, - FLAGS_tera_io_cache_path_vanish_allowed); - flash_env->SetUpdateFlashThreadNumber(FLAGS_tera_tabletnode_cache_update_thread_num); - flash_env->SetIfForceReadFromCache(FLAGS_tera_tabletnode_cache_force_read_from_cache); - return; - } + if (FLAGS_tera_tabletnode_block_cache_enabled) { + LOG(INFO) << "t-cache: set flash path: " << FLAGS_tera_tabletnode_cache_paths; + std::vector path_list; + SplitString(FLAGS_tera_tabletnode_cache_paths, ";", &path_list); + + leveldb::Env* posix_env = leveldb::Env::Default(); + for (uint32_t i = 0; i < path_list.size(); ++i) { + posix_env->CreateDir(path_list[i]); + } - LOG(INFO) << "activate new cache system"; - // new cache mechanism - leveldb::ThreeLevelCacheEnv::SetCachePaths(FLAGS_tera_tabletnode_cache_paths); - leveldb::ThreeLevelCacheEnv::s_mem_cache_size_in_KB_ = FLAGS_tera_tabletnode_cache_mem_size; - leveldb::ThreeLevelCacheEnv::s_disk_cache_size_in_MB_ = FLAGS_tera_tabletnode_cache_disk_size; - leveldb::ThreeLevelCacheEnv::s_block_size_ = FLAGS_tera_tabletnode_cache_block_size; - leveldb::ThreeLevelCacheEnv::s_disk_cache_file_num_ = FLAGS_tera_tabletnode_cache_disk_filenum; - leveldb::ThreeLevelCacheEnv::s_disk_cache_file_name_ = FLAGS_tera_tabletnode_cache_name; - - if (FLAGS_tera_tabletnode_cache_log_level < 3) { - LEVELDB_SET_LOG_LEVEL(WARNING); - } else if (FLAGS_tera_tabletnode_cache_log_level < 4) { - LEVELDB_SET_LOG_LEVEL(INFO); - } else { - LEVELDB_SET_LOG_LEVEL(DEBUG); + LOG(INFO) << "activate t-cache system"; + leveldb::Env* block_cache_env = io::DefaultBlockCacheEnv(); + for (uint32_t i = 0; i < path_list.size(); ++i) { + leveldb::BlockCacheOptions opts; + LOG(INFO) << "load cache: " << path_list[i]; + reinterpret_cast(block_cache_env)->LoadCache(opts, path_list[i] + "/block_cache"); + } + return; } + // compitable with legacy FlashEnv + leveldb::FlashEnv* flash_env = (leveldb::FlashEnv*)io::LeveldbFlashEnv(); + flash_env->SetFlashPath(FLAGS_tera_tabletnode_cache_paths, + FLAGS_tera_io_cache_path_vanish_allowed); + flash_env->SetUpdateFlashThreadNumber(FLAGS_tera_tabletnode_cache_update_thread_num); + flash_env->SetIfForceReadFromCache(FLAGS_tera_tabletnode_cache_force_read_from_cache); + return; } bool TabletNodeImpl::Exit() { @@ -1070,7 +1068,7 @@ void TabletNodeImpl::UpdateMetaTableCallback(const SplitTabletRequest* rpc_reque * ------------------------------------------ */ void TabletNodeImpl::GarbageCollect() { - if (FLAGS_tera_tabletnode_cache_enabled) { + if (FLAGS_tera_tabletnode_block_cache_enabled) { return; } int64_t start_ms = get_micros(); diff --git a/src/tera_flags.cc b/src/tera_flags.cc index 59d7b7a9e..949c36b98 100644 --- a/src/tera_flags.cc +++ b/src/tera_flags.cc @@ -64,6 +64,7 @@ DEFINE_int32(tera_leveldb_env_dfs_seek_latency, 10000000, "the random access lat DEFINE_int32(tera_memenv_table_cache_size, 100, "the max open file number in leveldb table_cache"); DEFINE_int32(tera_memenv_block_cache_size, 10000, "block cache size for leveldb which do not use share block cache"); DEFINE_bool(tera_use_flash_for_memenv, true, "Use flashenv for memery lg"); +DEFINE_int32(tera_leveldb_block_cache_env_thread_num, 30, "thread num of t-cache"); DEFINE_string(tera_leveldb_compact_strategy, "default", "the default strategy to drive consum compaction, should be [default|LG|dummy]"); DEFINE_bool(tera_leveldb_verify_checksums, true, "enable verify data read from storage against checksums"); @@ -201,7 +202,7 @@ DEFINE_string(tera_tabletnode_cpu_affinity_set, "1,2", "the cpu set of cpu affin DEFINE_bool(tera_tabletnode_hang_detect_enabled, false, "enable detect read/write hang"); DEFINE_int32(tera_tabletnode_hang_detect_threshold, 60000, "read/write hang detect threshold (in ms)"); -DEFINE_bool(tera_tabletnode_cache_enabled, false, "enable three-level cache mechasism"); +DEFINE_bool(tera_tabletnode_block_cache_enabled, true, "enable t-cache mechasism"); DEFINE_string(tera_tabletnode_cache_paths, "../data/cache/", "paths for cached data storage. Mutiple definition like: \"./path1/;./path2/\""); DEFINE_int32(tera_tabletnode_cache_block_size, 8192, "the block size of cache system"); DEFINE_string(tera_tabletnode_cache_name, "tera.cache", "prefix name for cache name");