diff --git a/benchmarks/micro-benchmarks/InsertUpdateBench.cpp b/benchmarks/micro-benchmarks/InsertUpdateBench.cpp index 9da5871d..cedc4b8f 100644 --- a/benchmarks/micro-benchmarks/InsertUpdateBench.cpp +++ b/benchmarks/micro-benchmarks/InsertUpdateBench.cpp @@ -22,7 +22,7 @@ static void BenchUpdateInsert(benchmark::State& state) { std::filesystem::remove_all(dirPath); std::filesystem::create_directories(dirPath); - StoreOption* option = CreateStoreOption("/tmp/InsertUpdateBench"); + StoreOption* option = CreateStoreOption("/tmp/leanstore/InsertUpdateBench"); option->mCreateFromScratch = true; option->mWorkerThreads = 4; auto sLeanStore = std::make_unique(option); diff --git a/benchmarks/ycsb/YcsbLeanStore.hpp b/benchmarks/ycsb/YcsbLeanStore.hpp index 51a55800..bf7b3575 100644 --- a/benchmarks/ycsb/YcsbLeanStore.hpp +++ b/benchmarks/ycsb/YcsbLeanStore.hpp @@ -41,6 +41,7 @@ class YcsbLeanStore : public YcsbExecutor { auto dataDirStr = FLAGS_ycsb_data_dir + std::string("/leanstore"); StoreOption* option = CreateStoreOption(dataDirStr.c_str()); option->mCreateFromScratch = createFromScratch; + option->mEnableEagerGc = true; option->mWorkerThreads = FLAGS_ycsb_threads; option->mBufferPoolSize = FLAGS_ycsb_mem_kb * 1024; option->mEnableMetrics = true; diff --git a/benchmarks/ycsb/ycsb-config.flags b/benchmarks/ycsb/ycsb-config.flags index def1faee..95cede19 100644 --- a/benchmarks/ycsb/ycsb-config.flags +++ b/benchmarks/ycsb/ycsb-config.flags @@ -3,8 +3,8 @@ --ycsb_val_size=200 --ycsb_record_count=100000 --ycsb_mem_kb=1048576 ---ycsb_run_for_seconds=5 ---ycsb_target=basickv ---ycsb_workload=c ---ycsb_threads=2 ---ycsb_cmd=load +--ycsb_run_for_seconds=600 +--ycsb_target=transactionkv +--ycsb_workload=a +--ycsb_threads=8 +--ycsb_cmd=run diff --git a/include/leanstore/LeanStore.hpp b/include/leanstore/LeanStore.hpp index 242bdb67..385350cc 100644 --- a/include/leanstore/LeanStore.hpp +++ b/include/leanstore/LeanStore.hpp @@ -73,10 +73,13 @@ class LeanStore { //! NOTE: Ownerd by LeanStore instance, should be destroyed together with it cr::CRManager* mCRManager; - //! The global timestamp oracle, used to generate start and commit timestamps - //! for all transactions in the store. Start from a positive number, 0 - //! indicates invalid timestamp - std::atomic mTimestampOracle = 1; + //! The global timestamp oracle for user transactions. Used to generate start and commit + //! timestamps for user transactions. Start from a positive number, 0 indicates invalid timestamp + std::atomic mUsrTso = 1; + + //! The global timestamp oracle for system transactions. Used to generate timestamps for system + //! transactions. Start from a positive number, 0 indicates invalid timestamp + std::atomic mSysTso = 1; //! The metrics manager std::unique_ptr mMetricsManager; @@ -118,13 +121,21 @@ class LeanStore { //! Unregister a TransactionKV void DropTransactionKV(const std::string& name); - uint64_t GetTs() { - return mTimestampOracle.load(); + uint64_t GetUsrTxTs() { + return mUsrTso.load(); + } + + uint64_t GetSysTxTs() { + return mSysTso.load(); } //! Alloc a new timestamp from the timestamp oracle - uint64_t AllocTs() { - return mTimestampOracle.fetch_add(1); + uint64_t AllocUsrTxTs() { + return mUsrTso.fetch_add(1); + } + + uint64_t AllocSysTxTs() { + return mSysTso.fetch_add(1); } //! Execute a custom user function on a worker thread. diff --git a/include/leanstore/btree/core/BTreeGeneric.hpp b/include/leanstore/btree/core/BTreeGeneric.hpp index 1daefedd..79e90f4d 100644 --- a/include/leanstore/btree/core/BTreeGeneric.hpp +++ b/include/leanstore/btree/core/BTreeGeneric.hpp @@ -10,6 +10,8 @@ #include "leanstore/sync/HybridLatch.hpp" #include "leanstore/utils/Log.hpp" +#include + #include #include @@ -55,9 +57,9 @@ class BTreeGeneric : public leanstore::storage::BufferManagedTree { //! Try to merge the current node with its left or right sibling, reclaim the merged left or right //! sibling if successful. - bool TryMergeMayJump(BufferFrame& toMerge, bool swizzleSibling = true); + bool TryMergeMayJump(TXID sysTxId, BufferFrame& toMerge, bool swizzleSibling = true); - void TrySplitMayJump(BufferFrame& toSplit, int16_t pos = -1); + void TrySplitMayJump(TXID sysTxId, BufferFrame& toSplit, int16_t pos = -1); XMergeReturnCode XMerge(GuardedBufferFrame& guardedParent, GuardedBufferFrame& guardedChild, @@ -141,7 +143,7 @@ class BTreeGeneric : public leanstore::storage::BufferManagedTree { //! | | //! newLeft toSplit /// - void splitRootMayJump(GuardedBufferFrame& guardedParent, + void splitRootMayJump(TXID sysTxId, GuardedBufferFrame& guardedParent, GuardedBufferFrame& guardedChild, const BTreeNode::SeparatorInfo& sepInfo); @@ -152,7 +154,7 @@ class BTreeGeneric : public leanstore::storage::BufferManagedTree { //! | | | //! toSplit newLeft toSplit /// - void splitNonRootMayJump(GuardedBufferFrame& guardedParent, + void splitNonRootMayJump(TXID sysTxId, GuardedBufferFrame& guardedParent, GuardedBufferFrame& guardedChild, const BTreeNode::SeparatorInfo& sepInfo, uint16_t spaceNeededForSeparator); @@ -198,15 +200,15 @@ class BTreeGeneric : public leanstore::storage::BufferManagedTree { xGuardedMeta.Reclaim(); } - // static void ToJson(BTreeGeneric& btree, rapidjson::Document* resultDoc); + static void ToJson(BTreeGeneric& btree, rapidjson::Document* resultDoc); private: static void freeBTreeNodesRecursive(BTreeGeneric& btree, GuardedBufferFrame& guardedNode); - // static void toJsonRecursive(BTreeGeneric& btree, GuardedBufferFrame& guardedNode, - // rapidjson::Value* resultObj, - // rapidjson::Value::AllocatorType& allocator); + static void toJsonRecursive(BTreeGeneric& btree, GuardedBufferFrame& guardedNode, + rapidjson::Value* resultObj, + rapidjson::Value::AllocatorType& allocator); static ParentSwipHandler findParentMayJump(BTreeGeneric& btree, BufferFrame& bfToFind) { return FindParent(btree, bfToFind); diff --git a/include/leanstore/btree/core/PessimisticExclusiveIterator.hpp b/include/leanstore/btree/core/PessimisticExclusiveIterator.hpp index 5e7c000b..6b11373b 100644 --- a/include/leanstore/btree/core/PessimisticExclusiveIterator.hpp +++ b/include/leanstore/btree/core/PessimisticExclusiveIterator.hpp @@ -56,22 +56,23 @@ class PessimisticExclusiveIterator : public PessimisticIterator { virtual void InsertToCurrentNode(Slice key, Slice val) { LS_DCHECK(KeyInCurrentNode(key)); LS_DCHECK(HasEnoughSpaceFor(key.size(), val.size())); - LS_DCHECK(mSlotId != -1); + LS_DCHECK(Valid()); mSlotId = mGuardedLeaf->InsertDoNotCopyPayload(key, val.size(), mSlotId); std::memcpy(mGuardedLeaf->ValData(mSlotId), val.data(), val.size()); } void SplitForKey(Slice key) { + auto sysTxId = mBTree.mStore->AllocSysTxTs(); while (true) { JUMPMU_TRY() { - if (mSlotId == -1 || !KeyInCurrentNode(key)) { + if (!Valid() || !KeyInCurrentNode(key)) { mBTree.FindLeafCanJump(key, mGuardedLeaf); } BufferFrame* bf = mGuardedLeaf.mBf; mGuardedLeaf.unlock(); - mSlotId = -1; + SetToInvalid(); - mBTree.TrySplitMayJump(*bf); + mBTree.TrySplitMayJump(sysTxId, *bf); COUNTERS_BLOCK() { WorkerCounters::MyCounters().dt_split[mBTree.mTreeId]++; } @@ -162,7 +163,8 @@ class PessimisticExclusiveIterator : public PessimisticIterator { mSlotId = -1; JUMPMU_TRY() { - mBTree.TrySplitMayJump(*mGuardedLeaf.mBf, splitSlot); + TXID sysTxId = mBTree.mStore->AllocSysTxTs(); + mBTree.TrySplitMayJump(sysTxId, *mGuardedLeaf.mBf, splitSlot); LS_DLOG("[Contention Split] succeed, pageId={}, contention pct={}, split " "slot={}", @@ -201,7 +203,8 @@ class PessimisticExclusiveIterator : public PessimisticIterator { mGuardedLeaf.unlock(); mSlotId = -1; JUMPMU_TRY() { - mBTree.TryMergeMayJump(*mGuardedLeaf.mBf); + TXID sysTxId = mBTree.mStore->AllocSysTxTs(); + mBTree.TryMergeMayJump(sysTxId, *mGuardedLeaf.mBf); } JUMPMU_CATCH() { LS_DLOG("TryMergeIfNeeded failed, pageId={}", mGuardedLeaf.mBf->mHeader.mPageId); diff --git a/include/leanstore/btree/core/PessimisticIterator.hpp b/include/leanstore/btree/core/PessimisticIterator.hpp index e55c8653..0e230529 100644 --- a/include/leanstore/btree/core/PessimisticIterator.hpp +++ b/include/leanstore/btree/core/PessimisticIterator.hpp @@ -12,6 +12,8 @@ #include +#include + namespace leanstore::storage::btree { using LeafCallback = std::function& guardedLeaf)>; @@ -401,7 +403,8 @@ inline void PessimisticIterator::Next() { if (mGuardedLeaf->mNumSlots == 0) { SetCleanUpCallback([&, toMerge = mGuardedLeaf.mBf]() { JUMPMU_TRY() { - mBTree.TryMergeMayJump(*toMerge, true); + TXID sysTxId = mBTree.mStore->AllocSysTxTs(); + mBTree.TryMergeMayJump(sysTxId, *toMerge, true); } JUMPMU_CATCH() { } diff --git a/include/leanstore/buffer-manager/AsyncWriteBuffer.hpp b/include/leanstore/buffer-manager/AsyncWriteBuffer.hpp index c59dc96f..529f431e 100644 --- a/include/leanstore/buffer-manager/AsyncWriteBuffer.hpp +++ b/include/leanstore/buffer-manager/AsyncWriteBuffer.hpp @@ -25,7 +25,7 @@ namespace leanstore::storage { //! writeBuffer.SubmitAll(); //! writeBuffer.WaitAll(); //! writeBuffer.IterateFlushedBfs([](BufferFrame& flushedBf, uint64_t -//! flushedGsn) { +//! flushedPsn) { //! // do something with flushedBf //! }, numFlushedBfs); /// @@ -72,7 +72,7 @@ class AsyncWriteBuffer { return mAIo.GetNumRequests(); } - void IterateFlushedBfs(std::function callback, + void IterateFlushedBfs(std::function callback, uint64_t numFlushedBfs); private: diff --git a/include/leanstore/buffer-manager/BufferFrame.hpp b/include/leanstore/buffer-manager/BufferFrame.hpp index 4824a0fe..77a2b0f0 100644 --- a/include/leanstore/buffer-manager/BufferFrame.hpp +++ b/include/leanstore/buffer-manager/BufferFrame.hpp @@ -9,6 +9,7 @@ #include "leanstore/utils/UserThread.hpp" #include +#include #include namespace leanstore::storage { @@ -66,14 +67,14 @@ class BufferFrameHeader { //! ID of page resides in this buffer frame. PID mPageId = std::numeric_limits::max(); - //! ID of the last worker who has modified the containing page. For remote - //! flush avoidance (RFA), see "Rethinking Logging, Checkpoints, and Recovery - //! for High-Performance Storage Engines, SIGMOD 2020" for details. + //! ID of the last worker who has modified the containing page. For remote flush avoidance (RFA), + //! see "Rethinking Logging, Checkpoints, and Recovery for High-Performance Storage Engines, + //! SIGMOD 2020" for details. WORKERID mLastWriterWorker = std::numeric_limits::max(); - //! The flushed global sequence number of the containing page. Initialized to - //! the page GSN when loaded from disk. - uint64_t mFlushedGsn = 0; + //! The flushed page sequence number of the containing page. Initialized when the containing page + //! is loaded from disk. + uint64_t mFlushedPsn = 0; //! Whether the containing page is being written back to disk. std::atomic mIsBeingWrittenBack = false; @@ -97,7 +98,7 @@ class BufferFrameHeader { mPageId = std::numeric_limits::max(); mLastWriterWorker = std::numeric_limits::max(); - mFlushedGsn = 0; + mFlushedPsn = 0; mIsBeingWrittenBack.store(false, std::memory_order_release); mContentionStats.Reset(); mCrc = 0; @@ -130,9 +131,15 @@ class Page { //! Short for "global sequence number", increased when a page is modified. //! It's used to check whether the page has been read or written by //! transactions in other workers. - //! A page is "dirty" when mPage.mGSN > mHeader.mFlushedGsn. uint64_t mGSN = 0; + //! Short for "system transaction id", increased when a system transaction modifies the page. + uint64_t mSysTxId = 0; + + //! Short for "page sequence number", increased when a page is modified by any user or system + //! transaction. A page is "dirty" when mPage.mPsn > mHeader.mFlushedPsn. + uint64_t mPsn = 0; + //! The btree ID it belongs to. TREEID mBTreeId = std::numeric_limits::max(); @@ -176,7 +183,7 @@ class BufferFrame { } bool IsDirty() const { - return mPage.mGSN != mHeader.mFlushedGsn; + return mPage.mPsn != mHeader.mFlushedPsn; } bool IsFree() const { @@ -192,9 +199,11 @@ class BufferFrame { LS_DCHECK(mHeader.mState == State::kFree); mHeader.mPageId = pageId; mHeader.mState = State::kHot; - mHeader.mFlushedGsn = 0; + mHeader.mFlushedPsn = 0; mPage.mGSN = 0; + mPage.mSysTxId = 0; + mPage.mPsn = 0; } // Pre: bf is exclusively locked diff --git a/include/leanstore/buffer-manager/GuardedBufferFrame.hpp b/include/leanstore/buffer-manager/GuardedBufferFrame.hpp index 8ef02b34..2f9a48ad 100644 --- a/include/leanstore/buffer-manager/GuardedBufferFrame.hpp +++ b/include/leanstore/buffer-manager/GuardedBufferFrame.hpp @@ -78,7 +78,7 @@ class GuardedBufferFrame { mGuard(&mBf->mHeader.mLatch), mKeepAlive(true) { latchMayJump(mGuard, latchMode); - SyncGSNBeforeRead(); + CheckRemoteDependency(); JUMPMU_PUSH_BACK_DESTRUCTOR_BEFORE_JUMP(); } @@ -96,7 +96,7 @@ class GuardedBufferFrame { mGuard(&mBf->mHeader.mLatch), mKeepAlive(true) { latchMayJump(mGuard, latchMode); - SyncGSNBeforeRead(); + CheckRemoteDependency(); JUMPMU_PUSH_BACK_DESTRUCTOR_BEFORE_JUMP(); guardedParent.JumpIfModifiedByOthers(); @@ -142,47 +142,44 @@ class GuardedBufferFrame { } public: - inline void SyncGSNBeforeWrite() { + //! Mark the page as dirty after modification by a user or system transaction. + void MarkPageAsDirty() { + LS_DCHECK(mBf != nullptr); + mBf->mPage.mPsn++; + } + + //! Sync the system transaction id to the page. Page system transaction id is updated during the + //! execution of a system transaction. + void SyncSystemTxId(TXID sysTxId) { LS_DCHECK(mBf != nullptr); - LS_DCHECK(mBf->mPage.mGSN <= cr::WorkerContext::My().mLogging.GetCurrentGsn(), - "Page GSN should <= worker GSN, pageGSN={}, workerGSN={}", mBf->mPage.mGSN, - cr::WorkerContext::My().mLogging.GetCurrentGsn()); // update last writer worker mBf->mHeader.mLastWriterWorker = cr::WorkerContext::My().mWorkerId; - // increase GSN - const auto workerGSN = cr::WorkerContext::My().mLogging.GetCurrentGsn(); - mBf->mPage.mGSN = workerGSN + 1; - cr::WorkerContext::My().mLogging.SetCurrentGsn(workerGSN + 1); + // update system transaction id + mBf->mPage.mSysTxId = sysTxId; + + // update the maximum system transaction id written by the worker + cr::WorkerContext::My().mLogging.UpdateSysTxWrittern(sysTxId); } - // TODO: don't sync on temporary table pages like history trees - inline void SyncGSNBeforeRead() { + //! Check remote dependency + //! TODO: don't sync on temporary table pages like history trees + void CheckRemoteDependency() { // skip if not running inside a worker if (!cr::WorkerContext::InWorker()) { return; } - if (!cr::WorkerContext::My().mLogging.mHasRemoteDependency && - mBf->mPage.mGSN > cr::WorkerContext::My().mLogging.mTxReadSnapshot && - mBf->mHeader.mLastWriterWorker != cr::WorkerContext::My().mWorkerId) { - cr::WorkerContext::My().mLogging.mHasRemoteDependency = true; - LS_DLOG("Detected remote dependency, workerId={}, " - "txReadSnapshot(GSN)={}, pageLastWriterWorker={}, pageGSN={}", - cr::WorkerContext::My().mWorkerId, cr::WorkerContext::My().mLogging.mTxReadSnapshot, - mBf->mHeader.mLastWriterWorker, mBf->mPage.mGSN); - } - - const auto workerGSN = cr::WorkerContext::My().mLogging.GetCurrentGsn(); - const auto pageGSN = mBf->mPage.mGSN; - if (workerGSN < pageGSN) { - cr::WorkerContext::My().mLogging.SetCurrentGsn(pageGSN); + if (mBf->mHeader.mLastWriterWorker != cr::WorkerContext::My().mWorkerId && + mBf->mPage.mSysTxId > cr::ActiveTx().mMaxObservedSysTxId) { + cr::ActiveTx().mMaxObservedSysTxId = mBf->mPage.mSysTxId; + cr::ActiveTx().mHasRemoteDependency = true; } } template - inline cr::WalPayloadHandler ReserveWALPayload(uint64_t walSize, Args&&... args) { + cr::WalPayloadHandler ReserveWALPayload(uint64_t walSize, Args&&... args) { LS_DCHECK(cr::ActiveTx().mIsDurable); LS_DCHECK(mGuard.mState == GuardState::kPessimisticExclusive); @@ -190,45 +187,44 @@ class GuardedBufferFrame { const auto treeId = mBf->mPage.mBTreeId; walSize = ((walSize - 1) / 8 + 1) * 8; auto handler = cr::WorkerContext::My().mLogging.ReserveWALEntryComplex( - sizeof(WT) + walSize, pageId, mBf->mPage.mGSN, treeId, std::forward(args)...); + sizeof(WT) + walSize, pageId, mBf->mPage.mPsn, treeId, std::forward(args)...); - SyncGSNBeforeWrite(); return handler; } template - inline void WriteWal(uint64_t walSize, Args&&... args) { + void WriteWal(uint64_t walSize, Args&&... args) { auto handle = ReserveWALPayload(walSize, std::forward(args)...); handle.SubmitWal(); } - inline bool EncounteredContention() { + bool EncounteredContention() { return mGuard.mEncounteredContention; } // NOLINTBEGIN - inline void unlock() { + void unlock() { mGuard.Unlock(); } - inline void JumpIfModifiedByOthers() { + void JumpIfModifiedByOthers() { mGuard.JumpIfModifiedByOthers(); } - inline T& ref() { + T& ref() { return *reinterpret_cast(mBf->mPage.mPayload); } - inline T* ptr() { + T* ptr() { return reinterpret_cast(mBf->mPage.mPayload); } - inline Swip swip() { + Swip swip() { return Swip(mBf); } - inline T* operator->() { + T* operator->() { return reinterpret_cast(mBf->mPage.mPayload); } @@ -311,8 +307,8 @@ class ExclusiveGuardedBufferFrame { mRefGuard.mKeepAlive = true; } - void SyncGSNBeforeWrite() { - mRefGuard.SyncGSNBeforeWrite(); + void SyncSystemTxId(TXID sysTxId) { + mRefGuard.SyncSystemTxId(sysTxId); } ~ExclusiveGuardedBufferFrame() { @@ -330,27 +326,27 @@ class ExclusiveGuardedBufferFrame { new (mRefGuard.mBf->mPage.mPayload) PayloadType(std::forward(args)...); } - inline uint8_t* GetPagePayloadPtr() { + uint8_t* GetPagePayloadPtr() { return reinterpret_cast(mRefGuard.mBf->mPage.mPayload); } - inline PayloadType* GetPagePayload() { + PayloadType* GetPagePayload() { return reinterpret_cast(mRefGuard.mBf->mPage.mPayload); } - inline PayloadType* operator->() { + PayloadType* operator->() { return GetPagePayload(); } - inline Swip swip() { + Swip swip() { return Swip(mRefGuard.mBf); } - inline BufferFrame* bf() { + BufferFrame* bf() { return mRefGuard.mBf; } - inline void Reclaim() { + void Reclaim() { mRefGuard.Reclaim(); } }; diff --git a/include/leanstore/concurrency/ConcurrencyControl.hpp b/include/leanstore/concurrency/ConcurrencyControl.hpp index 93c40886..edda4119 100644 --- a/include/leanstore/concurrency/ConcurrencyControl.hpp +++ b/include/leanstore/concurrency/ConcurrencyControl.hpp @@ -4,6 +4,7 @@ #include "leanstore/Units.hpp" #include "leanstore/concurrency/HistoryStorage.hpp" #include "leanstore/profiling/counters/CRCounters.hpp" +#include "leanstore/sync/HybridLatch.hpp" #include "leanstore/utils/Log.hpp" #include "leanstore/utils/Misc.hpp" @@ -26,8 +27,8 @@ namespace leanstore::cr { //! in the system when full. class CommitTree { public: - //! The mutex to guard the commit log. - std::shared_mutex mMutex; + //! The hybrid latch to guard the commit log. + storage::HybridLatch mLatch; //! The capacity of the commit log. Commit log is compacted when full. uint64_t mCapacity; diff --git a/include/leanstore/concurrency/GroupCommitter.hpp b/include/leanstore/concurrency/GroupCommitter.hpp index 1a71dc23..106cfe0b 100644 --- a/include/leanstore/concurrency/GroupCommitter.hpp +++ b/include/leanstore/concurrency/GroupCommitter.hpp @@ -5,6 +5,7 @@ #include "leanstore/utils/AsyncIo.hpp" #include "leanstore/utils/UserThread.hpp" +#include #include #include @@ -30,14 +31,9 @@ class GroupCommitter : public leanstore::utils::UserThread { //! Start file offset of the next WalEntry. uint64_t mWalSize; - //! The minimum flushed GSN among all worker threads. Transactions whose max observed GSN not - //! larger than it can be committed safely. - std::atomic mGlobalMinFlushedGSN; - - //! The maximum flushed GSN among all worker threads in each group commit round. It is updated by - //! the group commit thread and used to update the GCN counter of the current worker thread to - //! prevent GSN from skewing and undermining RFA. - std::atomic mGlobalMaxFlushedGSN; + //! The minimum flushed system transaction ID among all worker threads. User transactions whose + //! max observed system transaction ID not larger than it can be committed safely. + std::atomic mGlobalMinFlushedSysTx; //! All the workers. std::vector& mWorkerCtxs; @@ -52,8 +48,7 @@ class GroupCommitter : public leanstore::utils::UserThread { mStore(store), mWalFd(walFd), mWalSize(0), - mGlobalMinFlushedGSN(0), - mGlobalMaxFlushedGSN(0), + mGlobalMinFlushedSysTx(0), mWorkerCtxs(workers), mAIo(workers.size() * 2 + 2) { } @@ -67,12 +62,11 @@ class GroupCommitter : public leanstore::utils::UserThread { //! Phase 1: collect wal records from all the worker threads. Collected wal records are written to //! libaio IOCBs. //! - //! @param[out] minFlushedGSN the min flushed GSN among all the wal records - //! @param[out] maxFlushedGSN the max flushed GSN among all the wal records - //! @param[out] minFlushedTxId the min flushed transaction ID + //! @param[out] minFlushedSysTx the min flushed system transaction ID + //! @param[out] minFlushedUsrTx the min flushed user transaction ID //! @param[out] numRfaTxs number of transactions without dependency //! @param[out] walFlushReqCopies snapshot of the flush requests - void collectWalRecords(uint64_t& minFlushedGSN, uint64_t& maxFlushedGSN, TXID& minFlushedTxId, + void collectWalRecords(TXID& minFlushedSysTx, TXID& minFlushedUsrTx, std::vector& numRfaTxs, std::vector& walFlushReqCopies); @@ -81,12 +75,11 @@ class GroupCommitter : public leanstore::utils::UserThread { //! Phase 3: determine the commitable transactions based on minFlushedGSN and minFlushedTxId. //! - //! @param[in] minFlushedGSN the min flushed GSN among all the wal records - //! @param[in] maxFlushedGSN the max flushed GSN among all the wal records - //! @param[in] minFlushedTxId the min flushed transaction ID + //! @param[in] minFlushedSysTx the min flushed system transaction ID + //! @param[in] minFlushedUsrTx the min flushed user transaction ID //! @param[in] numRfaTxs number of transactions without dependency //! @param[in] walFlushReqCopies snapshot of the flush requests - void determineCommitableTx(uint64_t minFlushedGSN, uint64_t maxFlushedGSN, TXID minFlushedTxId, + void determineCommitableTx(TXID minFlushedSysTx, TXID minFlushedUsrTx, const std::vector& numRfaTxs, const std::vector& walFlushReqCopies); diff --git a/include/leanstore/concurrency/Logging.hpp b/include/leanstore/concurrency/Logging.hpp index 472b9338..5f64d8a6 100644 --- a/include/leanstore/concurrency/Logging.hpp +++ b/include/leanstore/concurrency/Logging.hpp @@ -4,6 +4,7 @@ #include "leanstore/concurrency/Transaction.hpp" #include "leanstore/sync/OptimisticGuarded.hpp" +#include #include #include #include @@ -23,16 +24,20 @@ struct WalFlushReq { //! The offset in the wal ring buffer. uint64_t mWalBuffered = 0; - //! GSN of the current WAL record. - uint64_t mCurrGSN = 0; + //! The maximum system transasction ID written by the worker. + //! NOTE: can only be updated when all the WAL entries belonging to the system transaction are + //! written to the wal ring buffer. + TXID mSysTxWrittern = 0; //! ID of the current transaction. + //! NOTE: can only be updated when all the WAL entries belonging to the user transaction are + //! written to the wal ring buffer. TXID mCurrTxId = 0; - WalFlushReq(uint64_t walBuffered = 0, uint64_t currGSN = 0, TXID currTxId = 0) + WalFlushReq(uint64_t walBuffered = 0, uint64_t sysTxWrittern = 0, TXID currTxId = 0) : mVersion(0), mWalBuffered(walBuffered), - mCurrGSN(currGSN), + mSysTxWrittern(sysTxWrittern), mCurrTxId(currTxId) { } }; @@ -84,8 +89,8 @@ class Logging { //! Used to track the write order of wal entries. LID mLsnClock = 0; - //! Used to track transaction dependencies. - uint64_t mGSNClock = 0; + //! The maximum writtern system transaction ID in the worker. + TXID mSysTxWrittern = 0; //! The written offset of the wal ring buffer. uint64_t mWalBuffered = 0; @@ -94,24 +99,15 @@ class Logging { //! by the worker thread then flushed to disk file by the group commit thread. std::atomic mWalFlushed = 0; - //! The global min flushed GSN when transaction started. Pages whose GSN larger than this value - //! might be modified by other transactions running at the same time, which cause the remote - //! transaction dependency. - uint64_t mTxReadSnapshot; - - //! Whether the active transaction has accessed data written by other worker transactions, i.e. - //! dependens on the transactions on other workers. - bool mHasRemoteDependency = false; - //! The first WAL record of the current active transaction. uint64_t mTxWalBegin; public: - inline void UpdateSignaledCommitTs(const LID signaledCommitTs) { + void UpdateSignaledCommitTs(const LID signaledCommitTs) { mSignaledCommitTs.store(signaledCommitTs, std::memory_order_release); } - inline bool SafeToCommit(const TXID commitTs) { + bool SafeToCommit(const TXID commitTs) { return commitTs <= mSignaledCommitTs.load(); } @@ -125,19 +121,15 @@ class Logging { void WriteWalCarriageReturn(); template - WalPayloadHandler ReserveWALEntryComplex(uint64_t payloadSize, PID pageId, LID gsn, + WalPayloadHandler ReserveWALEntryComplex(uint64_t payloadSize, PID pageId, LID psn, TREEID treeId, Args&&... args); //! Submits wal record to group committer when it is ready to flush to disk. //! @param totalSize size of the wal record to be flush. void SubmitWALEntryComplex(uint64_t totalSize); - inline uint64_t GetCurrentGsn() { - return mGSNClock; - } - - inline void SetCurrentGsn(uint64_t gsn) { - mGSNClock = gsn; + void UpdateSysTxWrittern(TXID sysTxId) { + mSysTxWrittern = std::max(mSysTxWrittern, sysTxId); } private: diff --git a/include/leanstore/concurrency/LoggingImpl.hpp b/include/leanstore/concurrency/LoggingImpl.hpp index a74d577e..d9591b73 100644 --- a/include/leanstore/concurrency/LoggingImpl.hpp +++ b/include/leanstore/concurrency/LoggingImpl.hpp @@ -8,7 +8,7 @@ namespace leanstore::cr { template -WalPayloadHandler Logging::ReserveWALEntryComplex(uint64_t payloadSize, PID pageId, LID gsn, +WalPayloadHandler Logging::ReserveWALEntryComplex(uint64_t payloadSize, PID pageId, LID psn, TREEID treeId, Args&&... args) { // write transaction start on demand auto prevLsn = mPrevLSN; @@ -28,7 +28,7 @@ WalPayloadHandler Logging::ReserveWALEntryComplex(uint64_t payloadSize, PID p mActiveWALEntryComplex = new (entryPtr) WalEntryComplex(entryLSN, prevLsn, entrySize, WorkerContext::My().mWorkerId, - ActiveTx().mStartTs, gsn, pageId, treeId); + ActiveTx().mStartTs, psn, pageId, treeId); auto* payloadPtr = mActiveWALEntryComplex->mPayload; auto walPayload = new (payloadPtr) T(std::forward(args)...); diff --git a/include/leanstore/concurrency/Transaction.hpp b/include/leanstore/concurrency/Transaction.hpp index 31af7a77..63c799ac 100644 --- a/include/leanstore/concurrency/Transaction.hpp +++ b/include/leanstore/concurrency/Transaction.hpp @@ -44,10 +44,14 @@ class Transaction { //! mCommitTs is the commit timestamp of the transaction. TXID mCommitTs = 0; - //! mMaxObservedGSN is the maximum observed global sequence number during - //! transaction processing. It's used to determine whether a transaction can - //! be committed. - LID mMaxObservedGSN = 0; + //! Maximum observed system transaction id during transaction processing. Used to track + //! transaction dependencies. + TXID mMaxObservedSysTxId = 0; + + //! Whether the transaction has any remote dependencies. Currently, we only support SI isolation + //! level, a user transaction can only depend on a system transaction executed in a remote worker + //! thread. + bool mHasRemoteDependency = false; //! mTxMode is the mode of the current transaction. TxMode mTxMode = TxMode::kShortRunning; @@ -66,20 +70,21 @@ class Transaction { bool mWalExceedBuffer = false; public: - inline bool IsLongRunning() { + bool IsLongRunning() { return mTxMode == TxMode::kLongRunning; } - inline bool AtLeastSI() { + bool AtLeastSI() { return mTxIsolationLevel >= IsolationLevel::kSnapshotIsolation; } // Start a new transaction, initialize all fields - inline void Start(TxMode mode, IsolationLevel level) { + void Start(TxMode mode, IsolationLevel level) { mState = TxState::kStarted; mStartTs = 0; mCommitTs = 0; - mMaxObservedGSN = 0; + mMaxObservedSysTxId = 0; + mHasRemoteDependency = false; mTxMode = mode; mTxIsolationLevel = level; mHasWrote = false; @@ -87,8 +92,9 @@ class Transaction { mWalExceedBuffer = false; } - inline bool CanCommit(uint64_t minFlushedGSN, TXID minFlushedTxId) { - return mMaxObservedGSN <= minFlushedGSN && mStartTs <= minFlushedTxId; + //! Check whether a user transaction with remote dependencies can be committed. + bool CanCommit(TXID minFlushedSysTx, TXID minFlushedUsrTx) { + return mMaxObservedSysTxId <= minFlushedSysTx && mStartTs <= minFlushedUsrTx; } }; diff --git a/include/leanstore/concurrency/WalEntry.hpp b/include/leanstore/concurrency/WalEntry.hpp index a52aa2ff..a064e166 100644 --- a/include/leanstore/concurrency/WalEntry.hpp +++ b/include/leanstore/concurrency/WalEntry.hpp @@ -98,9 +98,8 @@ class __attribute__((packed)) WalEntryComplex : public WalEntry { //! ID of the transaction who creates this WalEntry. TXID mTxId; - //! Global sequence number of the WalEntry, indicate the global order of the - //! WAL entry. - uint64_t mGsn; + //! Page sequence number of the WalEntry. + uint64_t mPsn; //! The page ID of the WalEntry, used to identify the btree node together with //! btree ID @@ -116,7 +115,7 @@ class __attribute__((packed)) WalEntryComplex : public WalEntry { WalEntryComplex() = default; - WalEntryComplex(LID lsn, LID prevLsn, uint64_t size, WORKERID workerId, TXID txid, LID gsn, + WalEntryComplex(LID lsn, LID prevLsn, uint64_t size, WORKERID workerId, TXID txid, LID psn, PID pageId, TREEID treeId) : WalEntry(Type::kComplex), mCrc32(0), @@ -125,7 +124,7 @@ class __attribute__((packed)) WalEntryComplex : public WalEntry { mSize(size), mWorkerId(workerId), mTxId(txid), - mGsn(gsn), + mPsn(psn), mPageId(pageId), mTreeId(treeId) { } @@ -185,7 +184,7 @@ const char kType[] = "mType"; const char kTxId[] = "mTxId"; const char kWorkerId[] = "mWorkerId"; const char kPrevLsn[] = "mPrevLsn"; -const char kGsn[] = "mGsn"; +const char kPsn[] = "mPsn"; const char kTreeId[] = "mTreeId"; const char kPageId[] = "mPageId"; diff --git a/include/leanstore/concurrency/WorkerContext.hpp b/include/leanstore/concurrency/WorkerContext.hpp index cd115a81..097f6be6 100644 --- a/include/leanstore/concurrency/WorkerContext.hpp +++ b/include/leanstore/concurrency/WorkerContext.hpp @@ -47,22 +47,24 @@ class WorkerContext { //! All the workers. std::vector& mAllWorkers; -public: WorkerContext(uint64_t workerId, std::vector& allWorkers, leanstore::LeanStore* store); ~WorkerContext(); -public: + //! Whether a user transaction is started. bool IsTxStarted() { return mActiveTx.mState == TxState::kStarted; } + //! Starts a user transaction. void StartTx(TxMode mode = TxMode::kShortRunning, IsolationLevel level = IsolationLevel::kSnapshotIsolation, bool isReadOnly = false); + //! Commits a user transaction. void CommitTx(); + //! Aborts a user transaction. void AbortTx(); public: @@ -76,7 +78,6 @@ class WorkerContext { static constexpr uint64_t kLongRunningBit = (1ull << 62); static constexpr uint64_t kCleanBitsMask = ~(kRcBit | kLongRunningBit); -public: static WorkerContext& My() { return *WorkerContext::sTlsWorkerCtxRaw; } diff --git a/src/LeanStore.cpp b/src/LeanStore.cpp index 8ac1aa56..72a87403 100644 --- a/src/LeanStore.cpp +++ b/src/LeanStore.cpp @@ -171,8 +171,8 @@ LeanStore::~LeanStore() { auto treeId = it.first; auto& [treePtr, treeName] = it.second; auto* btree = dynamic_cast(treePtr.get()); - Log::Info("leanstore/btreeName={}, btreeId={}, btreeType={}, btreeHeight={}", treeName, treeId, - static_cast(btree->mTreeType), btree->GetHeight()); + Log::Info("leanstore/btreeName={}, btreeId={}, btreeType={}, btreeSummary={}", treeName, treeId, + static_cast(btree->mTreeType), btree->Summary()); } // Stop transaction workers and group committer @@ -305,6 +305,12 @@ void LeanStore::serializeMeta(bool allPagesUpToDate) { rapidjson::Value btreeJsonId(btreeId); btreeJsonObj.AddMember("id", btreeJsonId, allocator); + rapidjson::Value btreeEnableWal(btree->mConfig.mEnableWal); + btreeJsonObj.AddMember("enable_wal", btreeEnableWal, allocator); + + rapidjson::Value btreeUseBulkInsert(btree->mConfig.mUseBulkInsert); + btreeJsonObj.AddMember("use_bulk_insert", btreeUseBulkInsert, allocator); + auto btreeMetaMap = mTreeRegistry->Serialize(btreeId); rapidjson::Value btreeMetaJsonObj(rapidjson::kObjectType); for (const auto& [key, val] : btreeMetaMap) { @@ -389,6 +395,8 @@ bool LeanStore::deserializeMeta() { const TREEID btreeId = btreeJsonObj["id"].GetInt64(); const auto btreeType = btreeJsonObj["type"].GetInt(); const std::string btreeName = btreeJsonObj["name"].GetString(); + const auto btreeEnableWal = btreeJsonObj["enable_wal"].GetBool(); + const auto btreeUseBulkInsert = btreeJsonObj["use_bulk_insert"].GetBool(); StringMap btreeMetaMap; auto& btreeMetaJsonObj = btreeJsonObj["serialized"]; @@ -401,12 +409,16 @@ bool LeanStore::deserializeMeta() { case leanstore::storage::btree::BTreeType::kBasicKV: { auto btree = std::make_unique(); btree->mStore = this; + btree->mConfig = + BTreeConfig{.mEnableWal = btreeEnableWal, .mUseBulkInsert = btreeUseBulkInsert}; mTreeRegistry->RegisterTree(btreeId, std::move(btree), btreeName); break; } case leanstore::storage::btree::BTreeType::kTransactionKV: { auto btree = std::make_unique(); btree->mStore = this; + btree->mConfig = + BTreeConfig{.mEnableWal = btreeEnableWal, .mUseBulkInsert = btreeUseBulkInsert}; // create graveyard ExecSync(0, [&]() { auto graveyardName = std::format("_{}_graveyard", btreeName); diff --git a/src/btree/BasicKV.cpp b/src/btree/BasicKV.cpp index dd57dd49..51d57e9d 100644 --- a/src/btree/BasicKV.cpp +++ b/src/btree/BasicKV.cpp @@ -345,7 +345,8 @@ OpCode BasicKV::RangeRemove(Slice startKey, Slice endKey, bool pageWise) { if (guardedLeaf->FreeSpaceAfterCompaction() >= BTreeNode::UnderFullSize()) { xIter.SetCleanUpCallback([&, toMerge = guardedLeaf.mBf] { JUMPMU_TRY() { - this->TryMergeMayJump(*toMerge); + TXID sysTxId = mStore->AllocSysTxTs(); + this->TryMergeMayJump(sysTxId, *toMerge); } JUMPMU_CATCH() { } diff --git a/src/btree/TransactionKV.cpp b/src/btree/TransactionKV.cpp index 8bff9c8c..5e0d294b 100644 --- a/src/btree/TransactionKV.cpp +++ b/src/btree/TransactionKV.cpp @@ -17,6 +17,7 @@ #include "leanstore/utils/Log.hpp" #include "leanstore/utils/Misc.hpp" #include "leanstore/utils/Result.hpp" +#include "leanstore/utils/UserThread.hpp" #include "telemetry/MetricsManager.hpp" #include @@ -679,7 +680,8 @@ SpaceCheckResult TransactionKV::CheckSpaceUtilization(BufferFrame& bf) { } guardedNode.ToExclusiveMayJump(); - guardedNode.SyncGSNBeforeWrite(); + TXID sysTxId = utils::tlsStore->AllocSysTxTs(); + guardedNode.SyncSystemTxId(sysTxId); for (uint16_t i = 0; i < guardedNode->mNumSlots; i++) { auto& tuple = *Tuple::From(guardedNode->ValData(i)); diff --git a/src/btree/core/BTreeGeneric.cpp b/src/btree/core/BTreeGeneric.cpp index 30d4081f..4f589c18 100644 --- a/src/btree/core/BTreeGeneric.cpp +++ b/src/btree/core/BTreeGeneric.cpp @@ -13,6 +13,7 @@ #include "leanstore/utils/Defer.hpp" #include "leanstore/utils/Log.hpp" #include "leanstore/utils/Misc.hpp" +#include "leanstore/utils/UserThread.hpp" #include "utils/ToJson.hpp" #include @@ -43,13 +44,18 @@ void BTreeGeneric::Init(leanstore::LeanStore* store, TREEID btreeId, BTreeConfig // Record WAL if (mConfig.mEnableWal) { + TXID sysTxId = mStore->AllocSysTxTs(); + auto rootWalHandler = - xGuardedRoot.ReserveWALPayload(0, mTreeId, xGuardedRoot->mIsLeaf); + xGuardedRoot.ReserveWALPayload(0, sysTxId, mTreeId, xGuardedRoot->mIsLeaf); rootWalHandler.SubmitWal(); auto metaWalHandler = - xGuardedMeta.ReserveWALPayload(0, mTreeId, xGuardedMeta->mIsLeaf); + xGuardedMeta.ReserveWALPayload(0, sysTxId, mTreeId, xGuardedMeta->mIsLeaf); metaWalHandler.SubmitWal(); + + xGuardedMeta.SyncSystemTxId(sysTxId); + xGuardedRoot.SyncSystemTxId(sysTxId); } } @@ -61,15 +67,16 @@ PessimisticExclusiveIterator BTreeGeneric::GetExclusiveIterator() { return PessimisticExclusiveIterator(*this); } -void BTreeGeneric::TrySplitMayJump(BufferFrame& toSplit, int16_t favoredSplitPos) { +void BTreeGeneric::TrySplitMayJump(TXID sysTxId, BufferFrame& toSplit, int16_t favoredSplitPos) { auto parentHandler = findParentEager(*this, toSplit); GuardedBufferFrame guardedParent( mStore->mBufferManager.get(), std::move(parentHandler.mParentGuard), parentHandler.mParentBf); auto guardedChild = GuardedBufferFrame(mStore->mBufferManager.get(), guardedParent, parentHandler.mChildSwip); if (guardedChild->mNumSlots <= 1) { - Log::Warn("Split failed, slots too less: pageId={}, favoredSplitPos={}, numSlots={}", - toSplit.mHeader.mPageId, favoredSplitPos, guardedChild->mNumSlots); + Log::Warn( + "Split failed, slots too less: sysTxId={}, pageId={}, favoredSplitPos={}, numSlots={}", + sysTxId, toSplit.mHeader.mPageId, favoredSplitPos, guardedChild->mNumSlots); return; } @@ -91,7 +98,7 @@ void BTreeGeneric::TrySplitMayJump(BufferFrame& toSplit, int16_t favoredSplitPos // split the root node if (isMetaNode(guardedParent)) { - splitRootMayJump(guardedParent, guardedChild, sepInfo); + splitRootMayJump(sysTxId, guardedParent, guardedChild, sepInfo); return; } @@ -102,12 +109,12 @@ void BTreeGeneric::TrySplitMayJump(BufferFrame& toSplit, int16_t favoredSplitPos if (!guardedParent->HasEnoughSpaceFor(spaceNeededForSeparator)) { guardedParent.unlock(); guardedChild.unlock(); - TrySplitMayJump(*guardedParent.mBf); + TrySplitMayJump(sysTxId, *guardedParent.mBf); return; } // split the non-root node - splitNonRootMayJump(guardedParent, guardedChild, sepInfo, spaceNeededForSeparator); + splitNonRootMayJump(sysTxId, guardedParent, guardedChild, sepInfo, spaceNeededForSeparator); } //! Split the root node, 4 nodes are involved in the split: @@ -130,7 +137,7 @@ void BTreeGeneric::TrySplitMayJump(BufferFrame& toSplit, int16_t favoredSplitPos //! - insert separator key into new root //! - update meta node to point to new root /// -void BTreeGeneric::splitRootMayJump(GuardedBufferFrame& guardedMeta, +void BTreeGeneric::splitRootMayJump(TXID sysTxId, GuardedBufferFrame& guardedMeta, GuardedBufferFrame& guardedOldRoot, const BTreeNode::SeparatorInfo& sepInfo) { auto xGuardedMeta = ExclusiveGuardedBufferFrame(std::move(guardedMeta)); @@ -145,7 +152,8 @@ void BTreeGeneric::splitRootMayJump(GuardedBufferFrame& guardedMeta, auto guardedNewLeft = GuardedBufferFrame(bm, newLeftBf); auto xGuardedNewLeft = ExclusiveGuardedBufferFrame(std::move(guardedNewLeft)); if (mConfig.mEnableWal) { - xGuardedNewLeft.WriteWal(0, mTreeId, xGuardedOldRoot->mIsLeaf); + xGuardedNewLeft.SyncSystemTxId(sysTxId); + xGuardedNewLeft.WriteWal(0, sysTxId, mTreeId, xGuardedOldRoot->mIsLeaf); } xGuardedNewLeft.InitPayload(xGuardedOldRoot->mIsLeaf); @@ -154,14 +162,15 @@ void BTreeGeneric::splitRootMayJump(GuardedBufferFrame& guardedMeta, auto guardedNewRoot = GuardedBufferFrame(bm, newRootBf); auto xGuardedNewRoot = ExclusiveGuardedBufferFrame(std::move(guardedNewRoot)); if (mConfig.mEnableWal) { - xGuardedNewRoot.WriteWal(0, mTreeId, false); + xGuardedNewRoot.SyncSystemTxId(sysTxId); + xGuardedNewRoot.WriteWal(0, sysTxId, mTreeId, false); } xGuardedNewRoot.InitPayload(false); // 3.1. write wal on demand if (mConfig.mEnableWal) { - xGuardedMeta.SyncGSNBeforeWrite(); - xGuardedOldRoot.WriteWal(0, xGuardedNewLeft.bf()->mHeader.mPageId, + xGuardedOldRoot.SyncSystemTxId(sysTxId); + xGuardedOldRoot.WriteWal(0, sysTxId, xGuardedNewLeft.bf()->mHeader.mPageId, xGuardedNewRoot.bf()->mHeader.mPageId, xGuardedMeta.bf()->mHeader.mPageId, sepInfo); } @@ -183,7 +192,7 @@ void BTreeGeneric::splitRootMayJump(GuardedBufferFrame& guardedMeta, //! | | | //! child newLeft child /// -void BTreeGeneric::splitNonRootMayJump(GuardedBufferFrame& guardedParent, +void BTreeGeneric::splitNonRootMayJump(TXID sysTxId, GuardedBufferFrame& guardedParent, GuardedBufferFrame& guardedChild, const BTreeNode::SeparatorInfo& sepInfo, uint16_t spaceNeededForSeparator) { @@ -198,14 +207,16 @@ void BTreeGeneric::splitNonRootMayJump(GuardedBufferFrame& guardedPar auto guardedNewLeft = GuardedBufferFrame(mStore->mBufferManager.get(), newLeftBf); auto xGuardedNewLeft = ExclusiveGuardedBufferFrame(std::move(guardedNewLeft)); if (mConfig.mEnableWal) { - xGuardedNewLeft.WriteWal(0, mTreeId, xGuardedChild->mIsLeaf); + xGuardedNewLeft.SyncSystemTxId(sysTxId); + xGuardedNewLeft.WriteWal(0, sysTxId, mTreeId, xGuardedChild->mIsLeaf); } xGuardedNewLeft.InitPayload(xGuardedChild->mIsLeaf); // 2.1. write wal on demand or simply mark as dirty if (mConfig.mEnableWal) { - xGuardedParent.SyncGSNBeforeWrite(); - xGuardedChild.WriteWal(0, xGuardedParent.bf()->mHeader.mPageId, + xGuardedParent.SyncSystemTxId(sysTxId); + xGuardedChild.SyncSystemTxId(sysTxId); + xGuardedChild.WriteWal(0, sysTxId, xGuardedParent.bf()->mHeader.mPageId, xGuardedNewLeft.bf()->mHeader.mPageId, sepInfo); } @@ -216,7 +227,7 @@ void BTreeGeneric::splitNonRootMayJump(GuardedBufferFrame& guardedPar xGuardedChild->Split(xGuardedParent, xGuardedNewLeft, sepInfo); } -bool BTreeGeneric::TryMergeMayJump(BufferFrame& toMerge, bool swizzleSibling) { +bool BTreeGeneric::TryMergeMayJump(TXID sysTxId, BufferFrame& toMerge, bool swizzleSibling) { auto parentHandler = findParentEager(*this, toMerge); GuardedBufferFrame guardedParent( mStore->mBufferManager.get(), std::move(parentHandler.mParentGuard), parentHandler.mParentBf); @@ -262,9 +273,9 @@ bool BTreeGeneric::TryMergeMayJump(BufferFrame& toMerge, bool swizzleSibling) { } if (mConfig.mEnableWal) { - guardedParent.SyncGSNBeforeWrite(); - guardedChild.SyncGSNBeforeWrite(); - guardedLeft.SyncGSNBeforeWrite(); + guardedParent.SyncSystemTxId(sysTxId); + guardedChild.SyncSystemTxId(sysTxId); + guardedLeft.SyncSystemTxId(sysTxId); } xGuardedLeft.Reclaim(); @@ -295,9 +306,9 @@ bool BTreeGeneric::TryMergeMayJump(BufferFrame& toMerge, bool swizzleSibling) { } if (mConfig.mEnableWal) { - guardedParent.SyncGSNBeforeWrite(); - guardedChild.SyncGSNBeforeWrite(); - guardedRight.SyncGSNBeforeWrite(); + guardedParent.SyncSystemTxId(sysTxId); + guardedChild.SyncSystemTxId(sysTxId); + guardedRight.SyncSystemTxId(sysTxId); } xGuardedChild.Reclaim(); @@ -310,7 +321,7 @@ bool BTreeGeneric::TryMergeMayJump(BufferFrame& toMerge, bool swizzleSibling) { if (!isMetaNode(guardedParent) && guardedParent->FreeSpaceAfterCompaction() >= BTreeNode::UnderFullSize()) { JUMPMU_TRY() { - TryMergeMayJump(*guardedParent.mBf, true); + TryMergeMayJump(sysTxId, *guardedParent.mBf, true); } JUMPMU_CATCH() { COUNTERS_BLOCK() { @@ -477,7 +488,9 @@ BTreeGeneric::XMergeReturnCode BTreeGeneric::XMerge(GuardedBufferFrame xGuardedParent = std::move(guardedParent); - xGuardedParent.SyncGSNBeforeWrite(); + // TODO(zz-jason): support wal and sync system tx id + // TXID sysTxId = utils::tlsStore->AllocSysTxTs(); + // xGuardedParent.SyncSystemTxId(sysTxId); XMergeReturnCode retCode = XMergeReturnCode::kPartialMerge; int16_t leftHand, rightHand, ret; @@ -497,8 +510,9 @@ BTreeGeneric::XMergeReturnCode BTreeGeneric::XMerge(GuardedBufferFrame xGuardedRight( std::move(guardedNodes[rightHand - pos])); ExclusiveGuardedBufferFrame xGuardedLeft(std::move(guardedNodes[leftHand - pos])); - xGuardedRight.SyncGSNBeforeWrite(); - xGuardedLeft.SyncGSNBeforeWrite(); + // TODO(zz-jason): support wal and sync system tx id + // xGuardedRight.SyncSystemTxId(sysTxId); + // xGuardedLeft.SyncSystemTxId(sysTxId); maxRight = leftHand; ret = mergeLeftIntoRight(xGuardedParent, leftHand, xGuardedLeft, xGuardedRight, leftHand == pos); @@ -614,71 +628,69 @@ void BTreeGeneric::Deserialize(StringMap map) { mMetaNodeSwip.AsBufferFrame().mPage.mBTreeId); } -// void BTreeGeneric::ToJson(BTreeGeneric& btree, rapidjson::Document* resultDoc) { -// LS_DCHECK(resultDoc->IsObject()); -// auto& allocator = resultDoc->GetAllocator(); -// -// // meta node -// GuardedBufferFrame guardedMetaNode(btree.mStore->mBufferManager.get(), -// btree.mMetaNodeSwip); -// rapidjson::Value metaJson(rapidjson::kObjectType); -// utils::ToJson(guardedMetaNode.mBf, &metaJson, &allocator); -// resultDoc->AddMember("metaNode", metaJson, allocator); -// -// // root node -// GuardedBufferFrame guardedRootNode(btree.mStore->mBufferManager.get(), -// guardedMetaNode, -// guardedMetaNode->mRightMostChildSwip); -// rapidjson::Value rootJson(rapidjson::kObjectType); -// toJsonRecursive(btree, guardedRootNode, &rootJson, allocator); -// resultDoc->AddMember("rootNode", rootJson, allocator); -// } -// -// void BTreeGeneric::toJsonRecursive(BTreeGeneric& btree, GuardedBufferFrame& -// guardedNode, -// rapidjson::Value* resultObj, -// rapidjson::Value::AllocatorType& allocator) { -// -// LS_DCHECK(resultObj->IsObject()); -// // buffer frame header -// utils::ToJson(guardedNode.mBf, resultObj, &allocator); -// -// // btree node -// { -// rapidjson::Value nodeObj(rapidjson::kObjectType); -// utils::ToJson(guardedNode.ptr(), &nodeObj, &allocator); -// resultObj->AddMember("pagePayload(btreeNode)", nodeObj, allocator); -// } -// -// if (guardedNode->mIsLeaf) { -// return; -// } -// -// rapidjson::Value childrenJson(rapidjson::kArrayType); -// for (auto i = 0u; i < guardedNode->mNumSlots; ++i) { -// auto* childSwip = guardedNode->ChildSwip(i); -// GuardedBufferFrame guardedChild(btree.mStore->mBufferManager.get(), guardedNode, -// *childSwip); -// -// rapidjson::Value childObj(rapidjson::kObjectType); -// toJsonRecursive(btree, guardedChild, &childObj, allocator); -// guardedChild.unlock(); -// -// childrenJson.PushBack(childObj, allocator); -// } -// -// if (guardedNode->mRightMostChildSwip != nullptr) { -// GuardedBufferFrame guardedChild(btree.mStore->mBufferManager.get(), guardedNode, -// guardedNode->mRightMostChildSwip); -// rapidjson::Value childObj(rapidjson::kObjectType); -// toJsonRecursive(btree, guardedChild, &childObj, allocator); -// guardedChild.unlock(); -// -// childrenJson.PushBack(childObj, allocator); -// } -// -// // children -// resultObj->AddMember("mChildren", childrenJson, allocator); -// } +void BTreeGeneric::ToJson(BTreeGeneric& btree, rapidjson::Document* resultDoc) { + LS_DCHECK(resultDoc->IsObject()); + auto& allocator = resultDoc->GetAllocator(); + + // meta node + GuardedBufferFrame guardedMetaNode(btree.mStore->mBufferManager.get(), + btree.mMetaNodeSwip); + rapidjson::Value metaJson(rapidjson::kObjectType); + utils::ToJson(guardedMetaNode.mBf, &metaJson, &allocator); + resultDoc->AddMember("metaNode", metaJson, allocator); + + // root node + GuardedBufferFrame guardedRootNode(btree.mStore->mBufferManager.get(), guardedMetaNode, + guardedMetaNode->mRightMostChildSwip); + rapidjson::Value rootJson(rapidjson::kObjectType); + toJsonRecursive(btree, guardedRootNode, &rootJson, allocator); + resultDoc->AddMember("rootNode", rootJson, allocator); +} + +void BTreeGeneric::toJsonRecursive(BTreeGeneric& btree, GuardedBufferFrame& guardedNode, + rapidjson::Value* resultObj, + rapidjson::Value::AllocatorType& allocator) { + + LS_DCHECK(resultObj->IsObject()); + // buffer frame header + utils::ToJson(guardedNode.mBf, resultObj, &allocator); + + // btree node + { + rapidjson::Value nodeObj(rapidjson::kObjectType); + utils::ToJson(guardedNode.ptr(), &nodeObj, &allocator); + resultObj->AddMember("pagePayload(btreeNode)", nodeObj, allocator); + } + + if (guardedNode->mIsLeaf) { + return; + } + + rapidjson::Value childrenJson(rapidjson::kArrayType); + for (auto i = 0u; i < guardedNode->mNumSlots; ++i) { + auto* childSwip = guardedNode->ChildSwip(i); + GuardedBufferFrame guardedChild(btree.mStore->mBufferManager.get(), guardedNode, + *childSwip); + + rapidjson::Value childObj(rapidjson::kObjectType); + toJsonRecursive(btree, guardedChild, &childObj, allocator); + guardedChild.unlock(); + + childrenJson.PushBack(childObj, allocator); + } + + if (guardedNode->mRightMostChildSwip != nullptr) { + GuardedBufferFrame guardedChild(btree.mStore->mBufferManager.get(), guardedNode, + guardedNode->mRightMostChildSwip); + rapidjson::Value childObj(rapidjson::kObjectType); + toJsonRecursive(btree, guardedChild, &childObj, allocator); + guardedChild.unlock(); + + childrenJson.PushBack(childObj, allocator); + } + + // children + resultObj->AddMember("mChildren", childrenJson, allocator); +} } // namespace leanstore::storage::btree diff --git a/src/btree/core/BTreeWalPayload.cpp b/src/btree/core/BTreeWalPayload.cpp index d8824a28..04f69bfe 100644 --- a/src/btree/core/BTreeWalPayload.cpp +++ b/src/btree/core/BTreeWalPayload.cpp @@ -166,6 +166,13 @@ void WalPayload::toJson(const WalTxRemove* wal [[maybe_unused]], } void WalPayload::toJson(const WalInitPage* wal, rapidjson::Document* doc) { + // mSysTxId + { + rapidjson::Value member; + member.SetInt64(wal->mSysTxId); + doc->AddMember("mSysTxId", member, doc->GetAllocator()); + } + // mTreeId { rapidjson::Value member; @@ -182,6 +189,12 @@ void WalPayload::toJson(const WalInitPage* wal, rapidjson::Document* doc) { } void WalPayload::toJson(const WalSplitRoot* wal, rapidjson::Document* doc) { + { + rapidjson::Value member; + member.SetUint64(wal->mSysTxId); + doc->AddMember("mSysTxId", member, doc->GetAllocator()); + } + { rapidjson::Value member; member.SetUint64(wal->mNewLeft); @@ -220,6 +233,12 @@ void WalPayload::toJson(const WalSplitRoot* wal, rapidjson::Document* doc) { } void WalPayload::toJson(const WalSplitNonRoot* wal, rapidjson::Document* doc) { + { + rapidjson::Value member; + member.SetUint64(wal->mSysTxId); + doc->AddMember("mSysTxId", member, doc->GetAllocator()); + } + // mParentPageId { rapidjson::Value member; diff --git a/src/btree/core/BTreeWalPayload.hpp b/src/btree/core/BTreeWalPayload.hpp index ef4b9a2a..93ca9c04 100644 --- a/src/btree/core/BTreeWalPayload.hpp +++ b/src/btree/core/BTreeWalPayload.hpp @@ -11,13 +11,15 @@ namespace leanstore::storage::btree { -// forward declarations +// WAL generated by user transactions class WalInsert; class WalTxInsert; class WalUpdate; class WalTxUpdate; class WalRemove; class WalTxRemove; + +//! WAL generated by system transactions class WalInitPage; class WalSplitRoot; class WalSplitNonRoot; @@ -267,12 +269,15 @@ class WalTxRemove : public WalPayload { class WalInitPage : public WalPayload { public: + TXID mSysTxId; + TREEID mTreeId; bool mIsLeaf; - WalInitPage(TREEID treeId, bool isLeaf) + WalInitPage(TXID sysTxId, TREEID treeId, bool isLeaf) : WalPayload(Type::kWalInitPage), + mSysTxId(sysTxId), mTreeId(treeId), mIsLeaf(isLeaf) { } @@ -280,6 +285,8 @@ class WalInitPage : public WalPayload { class WalSplitRoot : public WalPayload { public: + TXID mSysTxId; + PID mNewLeft; PID mNewRoot; @@ -292,8 +299,10 @@ class WalSplitRoot : public WalPayload { bool mSeparatorTruncated; - WalSplitRoot(PID newLeft, PID newRoot, PID metaNode, const BTreeNode::SeparatorInfo& sepInfo) + WalSplitRoot(TXID sysTxId, PID newLeft, PID newRoot, PID metaNode, + const BTreeNode::SeparatorInfo& sepInfo) : WalPayload(Type::kWalSplitRoot), + mSysTxId(sysTxId), mNewLeft(newLeft), mNewRoot(newRoot), mMetaNode(metaNode), @@ -305,9 +314,11 @@ class WalSplitRoot : public WalPayload { class WalSplitNonRoot : public WalPayload { public: - PID mParentPageId = -1; + TXID mSysTxId; - PID mNewLeft = -1; + PID mParentPageId; + + PID mNewLeft; uint16_t mSplitSlot; @@ -318,8 +329,9 @@ class WalSplitNonRoot : public WalPayload { WalSplitNonRoot() : WalPayload(Type::kWalSplitNonRoot) { } - WalSplitNonRoot(PID parent, PID newLeft, const BTreeNode::SeparatorInfo& sepInfo) + WalSplitNonRoot(TXID sysTxId, PID parent, PID newLeft, const BTreeNode::SeparatorInfo& sepInfo) : WalPayload(Type::kWalSplitNonRoot), + mSysTxId(sysTxId), mParentPageId(parent), mNewLeft(newLeft), mSplitSlot(sepInfo.mSlotId), diff --git a/src/buffer-manager/AsyncWriteBuffer.cpp b/src/buffer-manager/AsyncWriteBuffer.cpp index c42ca080..b0d7a523 100644 --- a/src/buffer-manager/AsyncWriteBuffer.cpp +++ b/src/buffer-manager/AsyncWriteBuffer.cpp @@ -48,16 +48,17 @@ Result AsyncWriteBuffer::WaitAll() { return mAIo.WaitAll(); } -void AsyncWriteBuffer::IterateFlushedBfs(std::function callback, - uint64_t numFlushedBfs) { +void AsyncWriteBuffer::IterateFlushedBfs( + std::function callback, + uint64_t numFlushedBfs) { for (uint64_t i = 0; i < numFlushedBfs; i++) { const auto slot = (reinterpret_cast(mAIo.GetIoEvent(i)->data) - reinterpret_cast(mWriteBuffer.Get())) / mPageSize; auto* flushedPage = reinterpret_cast(getWriteBuffer(slot)); - auto flushedGsn = flushedPage->mGSN; + auto flushedPsn = flushedPage->mPsn; auto* flushedBf = mWriteCommands[slot].mBf; - callback(*const_cast(flushedBf), flushedGsn); + callback(*const_cast(flushedBf), flushedPsn); } } diff --git a/src/buffer-manager/BufferManager.cpp b/src/buffer-manager/BufferManager.cpp index d579c9f7..957c2aa1 100644 --- a/src/buffer-manager/BufferManager.cpp +++ b/src/buffer-manager/BufferManager.cpp @@ -162,7 +162,7 @@ Result BufferManager::CheckpointAllBufferFrames() { auto pageOffset = bf.mHeader.mPageId * pageSize; mStore->mTreeRegistry->Checkpoint(bf.mPage.mBTreeId, bf, tmpBuffer); aio.PrepareWrite(mStore->mPageFd, tmpBuffer, pageSize, pageOffset); - bf.mHeader.mFlushedGsn = bf.mPage.mGSN; + bf.mHeader.mFlushedPsn = bf.mPage.mPsn; batchSize++; } } @@ -192,7 +192,7 @@ Result BufferManager::CheckpointBufferFrame(BufferFrame& bf) { if (!res) { return std::unexpected(std::move(res.error())); } - bf.mHeader.mFlushedGsn = bf.mPage.mGSN; + bf.mHeader.mFlushedPsn = bf.mPage.mPsn; } bf.mHeader.mLatch.UnlockExclusively(); return {}; @@ -226,7 +226,7 @@ BufferFrame& BufferManager::AllocNewPageMayJump(TREEID treeId) { } freeBf.mPage.mBTreeId = treeId; - freeBf.mPage.mGSN++; // mark as dirty + freeBf.mPage.mPsn++; // mark the page as dirty LS_DLOG("Alloc new page, pageId={}, btreeId={}", freeBf.mHeader.mPageId, freeBf.mPage.mBTreeId); return freeBf; } @@ -308,7 +308,7 @@ BufferFrame* BufferManager::ResolveSwipMayJump(HybridGuard& nodeGuard, Swip& swi // 4. Intialize the buffer frame header LS_DCHECK(!bf.mHeader.mIsBeingWrittenBack); - bf.mHeader.mFlushedGsn = bf.mPage.mGSN; + bf.mHeader.mFlushedPsn = bf.mPage.mPsn; bf.mHeader.mState = State::kLoaded; bf.mHeader.mPageId = pageId; if (mStore->mStoreOption->mEnableBufferCrcCheck) { diff --git a/src/buffer-manager/PageEvictor.cpp b/src/buffer-manager/PageEvictor.cpp index cd628b9a..405a1ca5 100644 --- a/src/buffer-manager/PageEvictor.cpp +++ b/src/buffer-manager/PageEvictor.cpp @@ -313,7 +313,7 @@ void PageEvictor::FlushAndRecycleBufferFrames(Partition& targetPartition) { auto numFlushedBfs = result.value(); mAsyncWriteBuffer.IterateFlushedBfs( - [&](BufferFrame& writtenBf, uint64_t flushedGsn) { + [&](BufferFrame& writtenBf, uint64_t flushedPsn) { JUMPMU_TRY() { // When the written back page is being exclusively locked, we // should rather waste the write and move on to another page @@ -323,10 +323,10 @@ void PageEvictor::FlushAndRecycleBufferFrames(Partition& targetPartition) { BMOptimisticGuard optimisticGuard(writtenBf.mHeader.mLatch); BMExclusiveGuard exclusiveGuard(optimisticGuard); LS_DCHECK(writtenBf.mHeader.mIsBeingWrittenBack); - LS_DCHECK(writtenBf.mHeader.mFlushedGsn < flushedGsn); + LS_DCHECK(writtenBf.mHeader.mFlushedPsn < flushedPsn); // For recovery, so much has to be done here... - writtenBf.mHeader.mFlushedGsn = flushedGsn; + writtenBf.mHeader.mFlushedPsn = flushedPsn; writtenBf.mHeader.mIsBeingWrittenBack = false; PPCounters::MyCounters().flushed_pages_counter++; } diff --git a/src/concurrency/CRManager.cpp b/src/concurrency/CRManager.cpp index eb550f5a..d1600b8e 100644 --- a/src/concurrency/CRManager.cpp +++ b/src/concurrency/CRManager.cpp @@ -88,21 +88,23 @@ void CRManager::setupHistoryStorage4EachWorker() { } constexpr char kKeyWalSize[] = "wal_size"; -constexpr char kKeyGlobalLogicalClock[] = "global_logical_clock"; +constexpr char kKeyGlobalUsrTso[] = "global_user_tso"; +constexpr char kKeyGlobalSysTso[] = "global_system_tso"; StringMap CRManager::Serialize() { StringMap map; - uint64_t val = mStore->mTimestampOracle.load(); map[kKeyWalSize] = std::to_string(mGroupCommitter->mWalSize); - map[kKeyGlobalLogicalClock] = std::to_string(val); + map[kKeyGlobalUsrTso] = std::to_string(mStore->mUsrTso.load()); + map[kKeyGlobalSysTso] = std::to_string(mStore->mSysTso.load()); return map; } void CRManager::Deserialize(StringMap map) { - uint64_t val = std::stoull(map[kKeyGlobalLogicalClock]); - mStore->mTimestampOracle = val; - mStore->mCRManager->mGlobalWmkInfo.mWmkOfAllTx = val; mGroupCommitter->mWalSize = std::stoull(map[kKeyWalSize]); + mStore->mUsrTso = std::stoull(map[kKeyGlobalUsrTso]); + mStore->mSysTso = std::stoull(map[kKeyGlobalSysTso]); + + mStore->mCRManager->mGlobalWmkInfo.mWmkOfAllTx = mStore->mUsrTso.load(); } } // namespace leanstore::cr diff --git a/src/concurrency/ConcurrencyControl.cpp b/src/concurrency/ConcurrencyControl.cpp index 3aa02d5f..f186918b 100644 --- a/src/concurrency/ConcurrencyControl.cpp +++ b/src/concurrency/ConcurrencyControl.cpp @@ -6,7 +6,10 @@ #include "leanstore/concurrency/CRManager.hpp" #include "leanstore/concurrency/WorkerContext.hpp" #include "leanstore/profiling/counters/WorkerCounters.hpp" +#include "leanstore/sync/HybridLatch.hpp" +#include "leanstore/sync/ScopedHybridGuard.hpp" #include "leanstore/utils/Defer.hpp" +#include "leanstore/utils/JumpMU.hpp" #include "leanstore/utils/Log.hpp" #include "leanstore/utils/Misc.hpp" #include "leanstore/utils/RandomGenerator.hpp" @@ -25,7 +28,7 @@ namespace leanstore::cr { void CommitTree::AppendCommitLog(TXID startTs, TXID commitTs) { LS_DCHECK(mCommitLog.size() < mCapacity); utils::Timer timer(CRCounters::MyCounters().cc_ms_committing); - std::unique_lock xGuard(mMutex); + storage::ScopedHybridGuard xGuard(mLatch, storage::LatchMode::kPessimisticExclusive); mCommitLog.push_back({commitTs, startTs}); LS_DLOG("Commit log appended, workerId={}, startTs={}, commitTs={}", WorkerContext::My().mWorkerId, startTs, commitTs); @@ -65,7 +68,7 @@ void CommitTree::CompactCommitLog() { } // Refill the compacted commit log - std::unique_lock xGuard(mMutex); + storage::ScopedHybridGuard xGuard(mLatch, storage::LatchMode::kPessimisticExclusive); mCommitLog.clear(); for (auto& p : set) { mCommitLog.push_back(p); @@ -78,12 +81,19 @@ void CommitTree::CompactCommitLog() { } TXID CommitTree::Lcb(TXID startTs) { - std::shared_lock guard(mMutex); - - if (auto result = lcbNoLatch(startTs); result) { - return result->second; + while (true) { + JUMPMU_TRY() { + storage::ScopedHybridGuard oGuard(mLatch, storage::LatchMode::kOptimisticOrJump); + if (auto result = lcbNoLatch(startTs); result) { + oGuard.Unlock(); + JUMPMU_RETURN result->second; + } + oGuard.Unlock(); + JUMPMU_RETURN 0; + } + JUMPMU_CATCH() { + } } - return 0; } std::optional> CommitTree::lcbNoLatch(TXID startTs) { diff --git a/src/concurrency/GroupCommitter.cpp b/src/concurrency/GroupCommitter.cpp index 0cbd9faa..5b0cb503 100644 --- a/src/concurrency/GroupCommitter.cpp +++ b/src/concurrency/GroupCommitter.cpp @@ -21,15 +21,14 @@ constexpr size_t kAligment = 4096; void GroupCommitter::runImpl() { CPUCounters::registerThread(mThreadName, false); - uint64_t minFlushedGSN = std::numeric_limits::max(); - uint64_t maxFlushedGSN = 0; - TXID minFlushedTxId = std::numeric_limits::max(); + TXID minFlushedSysTx = std::numeric_limits::max(); + TXID minFlushedUsrTx = std::numeric_limits::max(); std::vector numRfaTxs(mWorkerCtxs.size(), 0); std::vector walFlushReqCopies(mWorkerCtxs.size()); while (mKeepRunning) { // phase 1 - collectWalRecords(minFlushedGSN, maxFlushedGSN, minFlushedTxId, numRfaTxs, walFlushReqCopies); + collectWalRecords(minFlushedSysTx, minFlushedUsrTx, numRfaTxs, walFlushReqCopies); // phase 2 if (!mAIo.IsEmpty()) { @@ -37,25 +36,24 @@ void GroupCommitter::runImpl() { } // phase 3 - determineCommitableTx(minFlushedGSN, maxFlushedGSN, minFlushedTxId, numRfaTxs, - walFlushReqCopies); + determineCommitableTx(minFlushedSysTx, minFlushedUsrTx, numRfaTxs, walFlushReqCopies); } } -void GroupCommitter::collectWalRecords(uint64_t& minFlushedGSN, uint64_t& maxFlushedGSN, - TXID& minFlushedTxId, std::vector& numRfaTxs, +void GroupCommitter::collectWalRecords(TXID& minFlushedSysTx, TXID& minFlushedUsrTx, + std::vector& numRfaTxs, std::vector& walFlushReqCopies) { leanstore::telemetry::MetricOnlyTimer timer; SCOPED_DEFER({ METRIC_HIST_OBSERVE(mStore->mMetricsManager, group_committer_prep_iocbs_us, timer.ElaspedUs()); }); - minFlushedGSN = std::numeric_limits::max(); - maxFlushedGSN = 0; - minFlushedTxId = std::numeric_limits::max(); + minFlushedSysTx = std::numeric_limits::max(); + minFlushedUsrTx = std::numeric_limits::max(); - for (uint32_t workerId = 0; workerId < mWorkerCtxs.size(); workerId++) { + for (auto workerId = 0u; workerId < mWorkerCtxs.size(); workerId++) { auto& logging = mWorkerCtxs[workerId]->mLogging; + // collect logging info std::unique_lock guard(logging.mRfaTxToCommitMutex); numRfaTxs[workerId] = logging.mRfaTxToCommit.size(); @@ -65,15 +63,18 @@ void GroupCommitter::collectWalRecords(uint64_t& minFlushedGSN, uint64_t& maxFlu auto version = logging.mWalFlushReq.Get(walFlushReqCopies[workerId]); walFlushReqCopies[workerId].mVersion = version; const auto& reqCopy = walFlushReqCopies[workerId]; + if (reqCopy.mVersion == lastReqVersion) { // no transaction log write since last round group commit, skip. continue; } - // update GSN and commitTS info - maxFlushedGSN = std::max(maxFlushedGSN, reqCopy.mCurrGSN); - minFlushedGSN = std::min(minFlushedGSN, reqCopy.mCurrGSN); - minFlushedTxId = std::min(minFlushedTxId, reqCopy.mCurrTxId); + if (reqCopy.mSysTxWrittern > 0) { + minFlushedSysTx = std::min(minFlushedSysTx, reqCopy.mSysTxWrittern); + } + if (reqCopy.mCurrTxId > 0) { + minFlushedUsrTx = std::min(minFlushedUsrTx, reqCopy.mCurrTxId); + } // prepare IOCBs on demand const uint64_t buffered = reqCopy.mWalBuffered; @@ -118,8 +119,7 @@ void GroupCommitter::flushWalRecords() { } } -void GroupCommitter::determineCommitableTx(uint64_t minFlushedGSN, uint64_t maxFlushedGSN, - TXID minFlushedTxId, +void GroupCommitter::determineCommitableTx(TXID minFlushedSysTx, TXID minFlushedUsrTx, const std::vector& numRfaTxs, const std::vector& walFlushReqCopies) { leanstore::telemetry::MetricOnlyTimer timer; @@ -141,15 +141,14 @@ void GroupCommitter::determineCommitableTx(uint64_t minFlushedGSN, uint64_t maxF uint64_t i = 0; for (; i < logging.mTxToCommit.size(); ++i) { auto& tx = logging.mTxToCommit[i]; - if (!tx.CanCommit(minFlushedGSN, minFlushedTxId)) { + if (!tx.CanCommit(minFlushedSysTx, minFlushedUsrTx)) { break; } maxCommitTs = std::max(maxCommitTs, tx.mCommitTs); tx.mState = TxState::kCommitted; - LS_DLOG("Transaction with remote dependency committed" - ", workerId={}, startTs={}, commitTs={}, minFlushedGSN={}, " - "maxFlushedGSN={}, minFlushedTxId={}", - workerId, tx.mStartTs, tx.mCommitTs, minFlushedGSN, maxFlushedGSN, minFlushedTxId); + LS_DLOG("Transaction with remote dependency committed, workerId={}, startTs={}, " + "commitTs={}, minFlushedSysTx={}, minFlushedUsrTx={}", + workerId, tx.mStartTs, tx.mCommitTs, minFlushedSysTx, minFlushedUsrTx); } if (i > 0) { logging.mTxToCommit.erase(logging.mTxToCommit.begin(), logging.mTxToCommit.begin() + i); @@ -165,12 +164,10 @@ void GroupCommitter::determineCommitableTx(uint64_t minFlushedGSN, uint64_t maxF auto& tx = logging.mRfaTxToCommit[i]; maxCommitTsRfa = std::max(maxCommitTsRfa, tx.mCommitTs); tx.mState = TxState::kCommitted; - LS_DLOG("Transaction without remote dependency committed" - ", workerId={}, startTs={}, commitTs={}, minFlushedGSN={}, " - "maxFlushedGSN={}, minFlushedTxId={}", - workerId, tx.mStartTs, tx.mCommitTs, minFlushedGSN, maxFlushedGSN, minFlushedTxId); + LS_DLOG("Transaction without remote dependency committed, workerId={}, startTs={}, " + "commitTs={}", + workerId, tx.mStartTs, tx.mCommitTs); } - if (i > 0) { logging.mRfaTxToCommit.erase(logging.mRfaTxToCommit.begin(), logging.mRfaTxToCommit.begin() + i); @@ -191,12 +188,7 @@ void GroupCommitter::determineCommitableTx(uint64_t minFlushedGSN, uint64_t maxF } } - if (minFlushedGSN < std::numeric_limits::max()) { - LS_DLOG("Group commit finished, minFlushedGSN={}, maxFlushedGSN={}", minFlushedGSN, - maxFlushedGSN); - mGlobalMinFlushedGSN.store(minFlushedGSN, std::memory_order_release); - mGlobalMaxFlushedGSN.store(maxFlushedGSN, std::memory_order_release); - } + mGlobalMinFlushedSysTx.store(minFlushedSysTx, std::memory_order_release); } void GroupCommitter::append(uint8_t* buf, uint64_t lower, uint64_t upper) { diff --git a/src/concurrency/HistoryStorage.cpp b/src/concurrency/HistoryStorage.cpp index 8ab42511..ff516d25 100644 --- a/src/concurrency/HistoryStorage.cpp +++ b/src/concurrency/HistoryStorage.cpp @@ -150,7 +150,8 @@ void HistoryStorage::PurgeVersions(TXID fromTxId, TXID toTxId, if (guardedLeaf->FreeSpaceAfterCompaction() >= BTreeNode::UnderFullSize()) { xIter.SetCleanUpCallback([&, toMerge = guardedLeaf.mBf] { JUMPMU_TRY() { - btree->TryMergeMayJump(*toMerge); + TXID sysTxId = btree->mStore->AllocSysTxTs(); + btree->TryMergeMayJump(sysTxId, *toMerge); } JUMPMU_CATCH() { } @@ -239,7 +240,8 @@ void HistoryStorage::PurgeVersions(TXID fromTxId, TXID toTxId, if (guardedLeaf->FreeSpaceAfterCompaction() >= BTreeNode::UnderFullSize()) { xIter.SetCleanUpCallback([&, toMerge = guardedLeaf.mBf] { JUMPMU_TRY() { - btree->TryMergeMayJump(*toMerge); + TXID sysTxId = btree->mStore->AllocSysTxTs(); + btree->TryMergeMayJump(sysTxId, *toMerge); } JUMPMU_CATCH() { } diff --git a/src/concurrency/Logging.cpp b/src/concurrency/Logging.cpp index 77e49f6e..40a43b81 100644 --- a/src/concurrency/Logging.cpp +++ b/src/concurrency/Logging.cpp @@ -59,9 +59,8 @@ void Logging::WriteWalTxAbort() { mWalBuffered += size; publishWalFlushReq(); - LS_DLOG("WriteWalTxAbort, workerId={}, startTs={}, curGSN={}, walJson={}", - WorkerContext::My().mWorkerId, WorkerContext::My().mActiveTx.mStartTs, GetCurrentGsn(), - utils::ToJsonString(entry)); + LS_DLOG("WriteWalTxAbort, workerId={}, startTs={}, walJson={}", WorkerContext::My().mWorkerId, + WorkerContext::My().mActiveTx.mStartTs, utils::ToJsonString(entry)); } void Logging::WriteWalTxFinish() { @@ -78,9 +77,8 @@ void Logging::WriteWalTxFinish() { mWalBuffered += size; publishWalFlushReq(); - LS_DLOG("WriteWalTxFinish, workerId={}, startTs={}, curGSN={}, walJson={}", - WorkerContext::My().mWorkerId, WorkerContext::My().mActiveTx.mStartTs, GetCurrentGsn(), - utils::ToJsonString(entry)); + LS_DLOG("WriteWalTxFinish, workerId={}, startTs={}, walJson={}", WorkerContext::My().mWorkerId, + WorkerContext::My().mActiveTx.mStartTs, utils::ToJsonString(entry)); } void Logging::WriteWalCarriageReturn() { @@ -101,9 +99,8 @@ void Logging::SubmitWALEntryComplex(uint64_t totalSize) { COUNTERS_BLOCK() { WorkerCounters::MyCounters().wal_write_bytes += totalSize; } - LS_DLOG("SubmitWal, workerId={}, startTs={}, curGSN={}, walJson={}", - WorkerContext::My().mWorkerId, WorkerContext::My().mActiveTx.mStartTs, GetCurrentGsn(), - utils::ToJsonString(mActiveWALEntryComplex)); + LS_DLOG("SubmitWal, workerId={}, startTs={}, walJson={}", WorkerContext::My().mWorkerId, + WorkerContext::My().mActiveTx.mStartTs, utils::ToJsonString(mActiveWALEntryComplex)); } void Logging::publishWalBufferedOffset() { @@ -111,7 +108,7 @@ void Logging::publishWalBufferedOffset() { } void Logging::publishWalFlushReq() { - WalFlushReq current(mWalBuffered, GetCurrentGsn(), WorkerContext::My().mActiveTx.mStartTs); + WalFlushReq current(mWalBuffered, mSysTxWrittern, WorkerContext::My().mActiveTx.mStartTs); mWalFlushReq.Set(current); } diff --git a/src/concurrency/Recovery.cpp b/src/concurrency/Recovery.cpp index 88c95aac..a5066fac 100644 --- a/src/concurrency/Recovery.cpp +++ b/src/concurrency/Recovery.cpp @@ -87,7 +87,7 @@ Result Recovery::analysis() { auto* wal = reinterpret_cast(walEntryPtr); mActiveTxTable[wal->mTxId] = offset; auto& bf = resolvePage(wal->mPageId); - if (wal->mGsn >= bf.mPage.mGSN && + if (wal->mPsn >= bf.mPage.mPsn && mDirtyPageTable.find(wal->mPageId) == mDirtyPageTable.end()) { // record the first WalEntry that makes the page dirty auto pageId = wal->mPageId; diff --git a/src/concurrency/WorkerContext.cpp b/src/concurrency/WorkerContext.cpp index 8a028bc8..731b5487 100644 --- a/src/concurrency/WorkerContext.cpp +++ b/src/concurrency/WorkerContext.cpp @@ -51,11 +51,8 @@ void WorkerContext::StartTx(TxMode mode, IsolationLevel level, bool isReadOnly) "Previous transaction not ended, workerId={}, startTs={}, txState={}", mWorkerId, prevTx.mStartTs, TxStatUtil::ToString(prevTx.mState)); SCOPED_DEFER({ - LS_DLOG("Start transaction, workerId={}, startTs={}, txReadSnapshot(GSN)={}, " - "workerGSN={}, globalMinFlushedGSN={}, globalMaxFlushedGSN={}", - mWorkerId, mActiveTx.mStartTs, mLogging.mTxReadSnapshot, mLogging.GetCurrentGsn(), - mStore->mCRManager->mGroupCommitter->mGlobalMinFlushedGSN.load(), - mStore->mCRManager->mGroupCommitter->mGlobalMaxFlushedGSN.load()); + LS_DLOG("Start transaction, workerId={}, startTs={}, globalMinFlushedSysTx={}", mWorkerId, + mActiveTx.mStartTs, mStore->mCRManager->mGroupCommitter->mGlobalMinFlushedSysTx.load()); }); mActiveTx.Start(mode, level); @@ -64,36 +61,24 @@ void WorkerContext::StartTx(TxMode mode, IsolationLevel level, bool isReadOnly) return; } - // Sync GSN clock with the global max flushed (observed) GSN, so that the - // global min flushed GSN can be advanced, transactions with remote dependency - // can be committed in time. - const auto maxFlushedGsn = mStore->mCRManager->mGroupCommitter->mGlobalMaxFlushedGSN.load(); - if (maxFlushedGsn > mLogging.GetCurrentGsn()) { - mLogging.SetCurrentGsn(maxFlushedGsn); - } + //! Reset the max observed system transaction id + mActiveTx.mMaxObservedSysTxId = mStore->mCRManager->mGroupCommitter->mGlobalMinFlushedSysTx; // Init wal and group commit related transaction information mLogging.mTxWalBegin = mLogging.mWalBuffered; - // For remote dependency validation - mLogging.mTxReadSnapshot = mStore->mCRManager->mGroupCommitter->mGlobalMinFlushedGSN.load(); - mLogging.mHasRemoteDependency = false; - // For now, we only support SI and SSI if (level < IsolationLevel::kSnapshotIsolation) { Log::Fatal("Unsupported isolation level: {}", static_cast(level)); } - // Draw TXID from global counter and publish it with the TX type (i.e. - // long-running or short-running) We have to acquire a transaction id and use - // it for locking in ANY isolation level - // - // TODO(jian.z): Allocating transaction start ts globally heavily hurts the - // scalability, especially for read-only transactions + // Draw TXID from global counter and publish it with the TX type (i.e. long-running or + // short-running) We have to acquire a transaction id and use it for locking in ANY isolation + // level if (isReadOnly) { - mActiveTx.mStartTs = mStore->GetTs(); + mActiveTx.mStartTs = mStore->GetUsrTxTs(); } else { - mActiveTx.mStartTs = mStore->AllocTs(); + mActiveTx.mStartTs = mStore->AllocUsrTxTs(); } auto curTxId = mActiveTx.mStartTs; if (mStore->mStoreOption->mEnableLongRunningTx && mActiveTx.IsLongRunning()) { @@ -119,7 +104,7 @@ void WorkerContext::CommitTx() { // Reset mCommandId on commit mCommandId = 0; if (mActiveTx.mHasWrote) { - mActiveTx.mCommitTs = mStore->AllocTs(); + mActiveTx.mCommitTs = mStore->AllocUsrTxTs(); mCc.mCommitTree.AppendCommitLog(mActiveTx.mStartTs, mActiveTx.mCommitTs); mCc.mLatestCommitTs.store(mActiveTx.mCommitTs, std::memory_order_release); } else { @@ -140,30 +125,23 @@ void WorkerContext::CommitTx() { mLogging.WriteWalTxFinish(); } - // update max observed GSN - mActiveTx.mMaxObservedGSN = mLogging.GetCurrentGsn(); - - if (mLogging.mHasRemoteDependency) { - // for group commit + // for group commit + if (mActiveTx.mHasRemoteDependency) { std::unique_lock g(mLogging.mTxToCommitMutex); mLogging.mTxToCommit.push_back(mActiveTx); - LS_DLOG("Puting transaction with remote dependency to mTxToCommit" - ", workerId={}, startTs={}, commitTs={}, maxObservedGSN={}", - mWorkerId, mActiveTx.mStartTs, mActiveTx.mCommitTs, mActiveTx.mMaxObservedGSN); } else { - // for group commit std::unique_lock g(mLogging.mRfaTxToCommitMutex); - CRCounters::MyCounters().rfa_committed_tx++; mLogging.mRfaTxToCommit.push_back(mActiveTx); - LS_DLOG("Puting transaction (RFA) to mRfaTxToCommit, workerId={}, " - "startTs={}, commitTs={}, maxObservedGSN={}", - mWorkerId, mActiveTx.mStartTs, mActiveTx.mCommitTs, mActiveTx.mMaxObservedGSN); } // Cleanup versions in history tree mCc.GarbageCollection(); // Wait logs to be flushed + LS_DLOG("Wait transaction to commit, workerId={}, startTs={}, commitTs={}, maxObseredSysTx={}, " + "hasRemoteDep={}", + mWorkerId, mActiveTx.mStartTs, mActiveTx.mCommitTs, mActiveTx.mMaxObservedSysTxId, + mActiveTx.mHasRemoteDependency); telemetry::MetricOnlyTimer timer; while (!mLogging.SafeToCommit(mActiveTx.mCommitTs)) { } @@ -186,9 +164,8 @@ void WorkerContext::AbortTx() { mActiveTx.mState = TxState::kAborted; METRIC_COUNTER_INC(mStore->mMetricsManager, tx_abort_total, 1); mActiveTxId.store(0, std::memory_order_release); - Log::Info("Transaction aborted, workerId={}, startTs={}, commitTs={}, " - "maxObservedGSN={}", - mWorkerId, mActiveTx.mStartTs, mActiveTx.mCommitTs, mActiveTx.mMaxObservedGSN); + Log::Info("Transaction aborted, workerId={}, startTs={}, commitTs={}, maxObservedSysTx={}", + mWorkerId, mActiveTx.mStartTs, mActiveTx.mCommitTs, mActiveTx.mMaxObservedSysTxId); }); if (!(mActiveTx.mState == TxState::kStarted && mActiveTx.mIsDurable)) { diff --git a/src/leanstore-c/StoreOption.cpp b/src/leanstore-c/StoreOption.cpp index 61df1a9b..f28fb9a3 100644 --- a/src/leanstore-c/StoreOption.cpp +++ b/src/leanstore-c/StoreOption.cpp @@ -33,7 +33,7 @@ static const StoreOption kDefaultStoreOption = { // Generic BTree related options .mEnableBulkInsert = false, - .mEnableXMerge = false, + .mEnableXMerge = true, .mXMergeK = 5, .mXMergeTargetPct = 80, .mEnableContentionSplit = true, diff --git a/src/utils/ToJson.hpp b/src/utils/ToJson.hpp index 65da0dbd..11a1b818 100644 --- a/src/utils/ToJson.hpp +++ b/src/utils/ToJson.hpp @@ -17,7 +17,7 @@ const char kType[] = "mType"; const char kTxId[] = "mTxId"; const char kWorkerId[] = "mWorkerId"; const char kPrevLsn[] = "mPrevLsn"; -const char kGsn[] = "mGsn"; +const char kPsn[] = "mPsn"; const char kTreeId[] = "mTreeId"; const char kPageId[] = "mPageId"; @@ -132,8 +132,8 @@ inline void ToJson(const leanstore::cr::WalEntryComplex* obj, rapidjson::Documen // psn { rapidjson::Value member; - member.SetUint64(obj->mGsn); - doc->AddMember(kGsn, member, doc->GetAllocator()); + member.SetUint64(obj->mPsn); + doc->AddMember(kPsn, member, doc->GetAllocator()); } // treeId @@ -196,8 +196,8 @@ inline void ToJson(leanstore::storage::BufferFrame* obj, rapidjson::Value* doc, { rapidjson::Value member; - member.SetUint64(obj->mHeader.mFlushedGsn); - headerObj.AddMember("mFlushedGsn", member, *allocator); + member.SetUint64(obj->mHeader.mFlushedPsn); + headerObj.AddMember("mFlushedPsn", member, *allocator); } { @@ -215,6 +215,11 @@ inline void ToJson(leanstore::storage::BufferFrame* obj, rapidjson::Value* doc, member.SetUint64(obj->mPage.mGSN); pageMetaObj.AddMember("mGSN", member, *allocator); } + { + rapidjson::Value member; + member.SetUint64(obj->mPage.mPsn); + pageMetaObj.AddMember("mPsn", member, *allocator); + } { rapidjson::Value member; member.SetUint64(obj->mPage.mBTreeId); diff --git a/tests/LongRunningTxTest.cpp b/tests/LongRunningTxTest.cpp index 7d11f20f..9cc81ac4 100644 --- a/tests/LongRunningTxTest.cpp +++ b/tests/LongRunningTxTest.cpp @@ -35,7 +35,7 @@ class LongRunningTxTest : public ::testing::Test { // Create a leanstore instance for the test case auto* curTest = ::testing::UnitTest::GetInstance()->current_test_info(); auto curTestName = std::string(curTest->test_case_name()) + "_" + std::string(curTest->name()); - auto storeDirStr = std::string("/tmp/") + curTestName; + auto storeDirStr = std::string("/tmp/leanstore/") + curTestName; StoreOption* option = CreateStoreOption(storeDirStr.c_str()); option->mCreateFromScratch = true; diff --git a/tests/MvccTest.cpp b/tests/MvccTest.cpp index 85af0a9a..c390956e 100644 --- a/tests/MvccTest.cpp +++ b/tests/MvccTest.cpp @@ -27,7 +27,7 @@ class MvccTest : public ::testing::Test { MvccTest() { auto* curTest = ::testing::UnitTest::GetInstance()->current_test_info(); auto curTestName = std::string(curTest->test_case_name()) + "_" + std::string(curTest->name()); - auto storeDirStr = "/tmp/" + curTestName; + auto storeDirStr = "/tmp/leanstore/" + curTestName; StoreOption* option = CreateStoreOption(storeDirStr.c_str()); option->mCreateFromScratch = true; option->mWorkerThreads = 3; diff --git a/tests/OptimisticGuardedTest.cpp b/tests/OptimisticGuardedTest.cpp index 9fd472a6..85794570 100644 --- a/tests/OptimisticGuardedTest.cpp +++ b/tests/OptimisticGuardedTest.cpp @@ -26,7 +26,7 @@ class OptimisticGuardedTest : public ::testing::Test { void SetUp() override { auto* curTest = ::testing::UnitTest::GetInstance()->current_test_info(); auto curTestName = std::string(curTest->test_case_name()) + "_" + std::string(curTest->name()); - auto storeDirStr = "/tmp/" + curTestName; + auto storeDirStr = "/tmp/leanstore/" + curTestName; auto* option = CreateStoreOption(storeDirStr.c_str()); option->mCreateFromScratch = true; option->mWorkerThreads = 2; diff --git a/tests/RecoveryTest.cpp b/tests/RecoveryTest.cpp index 3ef4b7c2..abc52c64 100644 --- a/tests/RecoveryTest.cpp +++ b/tests/RecoveryTest.cpp @@ -12,6 +12,9 @@ #include "leanstore/utils/RandomGenerator.hpp" #include +#include +#include +#include #include #include @@ -31,7 +34,7 @@ class RecoveryTest : public ::testing::Test { // Create a leanstore instance for the test case auto* curTest = ::testing::UnitTest::GetInstance()->current_test_info(); auto curTestName = std::string(curTest->test_case_name()) + "_" + std::string(curTest->name()); - auto storeDirStr = "/tmp/" + curTestName; + auto storeDirStr = "/tmp/leanstore/" + curTestName; auto* option = CreateStoreOption(storeDirStr.c_str()); option->mCreateFromScratch = true; option->mWorkerThreads = 2; @@ -153,6 +156,18 @@ TEST_F(RecoveryTest, RecoverAfterInsert) { cr::WorkerContext::My().CommitTx(); }); + // print btree before destroy + { + rapidjson::Document doc; + doc.SetObject(); + BTreeGeneric::ToJson(*btree, &doc); + // print the doc string + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + doc.Accept(writer); + std::cout << std::format("BTree before destroy: {}", buffer.GetString()); + } + // skip dumpping buffer frames on exit LS_DEBUG_ENABLE(mStore, "skip_CheckpointAllBufferFrames"); SCOPED_DEFER({ LS_DEBUG_DISABLE(mStore, "skip_CheckpointAllBufferFrames"); }); @@ -169,6 +184,18 @@ TEST_F(RecoveryTest, RecoverAfterInsert) { mStore->GetTransactionKV(btreeName, &btree); EXPECT_NE(btree, nullptr); + // print btree after recovery + // { + // rapidjson::Document doc; + // doc.SetObject(); + // BTreeGeneric::ToJson(*btree, &doc); + // // print the doc string + // rapidjson::StringBuffer buffer; + // rapidjson::Writer writer(buffer); + // doc.Accept(writer); + // Log::Info("BTree after recovery: {}", buffer.GetString()); + // } + // lookup the restored btree mStore->ExecSync(0, [&]() { cr::WorkerContext::My().StartTx(); diff --git a/tests/TransactionKVTest.cpp b/tests/TransactionKVTest.cpp index 57ad5d6f..94182aae 100644 --- a/tests/TransactionKVTest.cpp +++ b/tests/TransactionKVTest.cpp @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -31,7 +32,7 @@ class TransactionKVTest : public ::testing::Test { TransactionKVTest() { auto* curTest = ::testing::UnitTest::GetInstance()->current_test_info(); auto curTestName = std::string(curTest->test_case_name()) + "_" + std::string(curTest->name()); - auto* option = CreateStoreOption(("/tmp/" + curTestName).c_str()); + auto* option = CreateStoreOption(("/tmp/leanstore/" + curTestName).c_str()); option->mCreateFromScratch = true; option->mWorkerThreads = 3; option->mEnableEagerGc = true; @@ -855,4 +856,55 @@ TEST_F(TransactionKVTest, InsertAfterRemoveDifferentWorkers) { }); } +TEST_F(TransactionKVTest, ConcurrentInsertWithSplit) { + // prepare a btree for insert + storage::btree::TransactionKV* btree; + mStore->ExecSync(0, [&]() { + auto res = mStore->CreateTransactionKV( + ::testing::UnitTest::GetInstance()->current_test_info()->name()); + btree = res.value(); + EXPECT_NE(btree, nullptr); + }); + + std::atomic stop = false; + auto keySize = 24; + auto valSize = 120; + + // insert in worker 0 asynchorously + mStore->ExecAsync(0, [&]() { + for (auto i = 0; !stop; i++) { + cr::WorkerContext::My().StartTx(); + SCOPED_DEFER(cr::WorkerContext::My().CommitTx()); + auto key = std::format("{}_{}_{}", RandomGenerator::RandAlphString(keySize), 0, i); + auto val = RandomGenerator::RandAlphString(valSize); + auto res = btree->Insert(key, val); + EXPECT_EQ(res, OpCode::kOK); + } + }); + + // insert in worker 1 asynchorously + mStore->ExecAsync(1, [&]() { + for (auto i = 0; !stop; i++) { + cr::WorkerContext::My().StartTx(); + SCOPED_DEFER(cr::WorkerContext::My().CommitTx()); + auto key = std::format("{}_{}_{}", RandomGenerator::RandAlphString(keySize), 1, i); + auto val = RandomGenerator::RandAlphString(valSize); + auto res = btree->Insert(key, val); + EXPECT_EQ(res, OpCode::kOK); + } + }); + + // sleep for 2 seconds + std::this_thread::sleep_for(std::chrono::seconds(2)); + stop.store(true); + mStore->Wait(0); + mStore->Wait(1); + + // check the btree summary + mStore->ExecSync(0, [&]() { + auto summary = btree->Summary(); + EXPECT_EQ(summary, ""); + }); +} + } // namespace leanstore::test diff --git a/tests/btree/BTreeGegericTest.cpp b/tests/btree/BTreeGegericTest.cpp index fd0fc736..d16219b4 100644 --- a/tests/btree/BTreeGegericTest.cpp +++ b/tests/btree/BTreeGegericTest.cpp @@ -26,7 +26,7 @@ class BTreeGenericTest : public ::testing::Test { BTreeGenericTest() { auto* curTest = ::testing::UnitTest::GetInstance()->current_test_info(); auto curTestName = std::string(curTest->test_case_name()) + "_" + std::string(curTest->name()); - auto storeDirStr = "/tmp/leanstore-test/" + curTestName; + auto storeDirStr = "/tmp/leanstore/" + curTestName; StoreOption* option = CreateStoreOption(storeDirStr.c_str()); option->mCreateFromScratch = true; option->mWorkerThreads = 2; @@ -70,8 +70,8 @@ TEST_F(BTreeGenericTest, GetSummary) { auto* btree = dynamic_cast(mBTree); ASSERT_NE(btree, nullptr); - EXPECT_EQ(btree->Summary(), "entries=200, nodes=18, innerNodes=1, spacePct=0.00, height=2, " - "rootSlots=16, freeSpaceAfterCompaction=24569"); + EXPECT_EQ(btree->Summary(), "entries=200, nodes=19, innerNodes=1, spacePct=0.00, height=2, " + "rootSlots=17, freeSpaceAfterCompaction=28375"); } } // namespace leanstore::test \ No newline at end of file diff --git a/tests/btree/BTreeWalPayloadTest.cpp b/tests/btree/BTreeWalPayloadTest.cpp index c6446380..cc6d6def 100644 --- a/tests/btree/BTreeWalPayloadTest.cpp +++ b/tests/btree/BTreeWalPayloadTest.cpp @@ -79,7 +79,7 @@ TEST_F(BTreeWalPayloadTest, ToJson) { EXPECT_TRUE(walStr.contains("kWalTxRemove")); EXPECT_TRUE(walStr.contains("Not implemented")); - wal = std::make_unique(0, false); + wal = std::make_unique(0, 0, false); walStr = WalPayload::ToJsonString(wal.get()); EXPECT_TRUE(walStr.contains("mType")); EXPECT_TRUE(walStr.contains("kWalInitPage")); @@ -87,14 +87,14 @@ TEST_F(BTreeWalPayloadTest, ToJson) { EXPECT_TRUE(walStr.contains("mIsLeaf")); BTreeNode::SeparatorInfo sepInfo; - wal = std::make_unique(0, 0, 0, sepInfo); + wal = std::make_unique(0, 0, 0, 0, sepInfo); walStr = WalPayload::ToJsonString(wal.get()); EXPECT_TRUE(walStr.contains("mType")); EXPECT_TRUE(walStr.contains("kWalSplitRoot")); EXPECT_TRUE(walStr.contains("mNewLeft")); EXPECT_TRUE(walStr.contains("mNewRoot")); - wal = std::make_unique(0, 0, sepInfo); + wal = std::make_unique(0, 0, 0, sepInfo); walStr = WalPayload::ToJsonString(wal.get()); EXPECT_TRUE(walStr.contains("mType")); EXPECT_TRUE(walStr.contains("kWalSplitNonRoot")); diff --git a/tests/btree/BasicKVTest.cpp b/tests/btree/BasicKVTest.cpp index 3afa9e28..75e1d9f8 100644 --- a/tests/btree/BasicKVTest.cpp +++ b/tests/btree/BasicKVTest.cpp @@ -5,7 +5,6 @@ #include "leanstore/btree/TransactionKV.hpp" #include "leanstore/buffer-manager/BufferManager.hpp" #include "leanstore/concurrency/CRManager.hpp" -#include "leanstore/utils/Defer.hpp" #include @@ -13,7 +12,6 @@ #include #include #include -#include #include #include @@ -311,33 +309,6 @@ TEST_F(BasicKVTest, SameKeyInsertRemoveMultiTimes) { kvToTest.emplace_back(std::move(key), std::move(val)); } - // // start a new thread, remove-insert the key-values to the btree - // std::atomic stop{false}; - // std::thread t1([&]() { - // while (!stop) { - // for (const auto& [key, val] : kvToTest) { - // mStore->ExecSync(0, [&]() { EXPECT_EQ(btree->Remove(key), OpCode::kOK); }); - // mStore->ExecSync(0, [&]() { EXPECT_EQ(btree->Insert(key, val), OpCode::kOK); }); - // } - // } - // }); - - // // start another thread, remove-insert the key-values to the btree - // std::thread t2([&]() { - // while (!stop) { - // for (const auto& [key, val] : kvToTest) { - // mStore->ExecSync(1, [&]() { EXPECT_EQ(btree->Remove(key), OpCode::kOK); }); - // mStore->ExecSync(1, [&]() { EXPECT_EQ(btree->Insert(key, val), OpCode::kOK); }); - // } - // } - // }); - - // // sleep for 1 seconds - // std::this_thread::sleep_for(std::chrono::seconds(1)); - // stop = true; - // t1.join(); - // t2.join(); - // 1. remove the key-values from the btree // 2. insert the key-values to the btree again const auto& [key, val] = kvToTest[numKVs / 2]; @@ -361,8 +332,8 @@ TEST_F(BasicKVTest, SameKeyInsertRemoveMultiTimes) { } }); - // sleep for 1 seconds - std::this_thread::sleep_for(std::chrono::seconds(20)); + // sleep for 2 seconds + std::this_thread::sleep_for(std::chrono::seconds(2)); stop = true; t1.join(); t2.join(); diff --git a/tests/buffer-manager/AsyncWriteBufferTest.cpp b/tests/buffer-manager/AsyncWriteBufferTest.cpp index ce5ce1d5..8cdc15af 100644 --- a/tests/buffer-manager/AsyncWriteBufferTest.cpp +++ b/tests/buffer-manager/AsyncWriteBufferTest.cpp @@ -109,10 +109,10 @@ TEST_F(AsyncWriteBufferTest, Basic) { // check the flushed content testWriteBuffer.IterateFlushedBfs( - [](BufferFrame& flushedBf, uint64_t flushedGsn) { + [](BufferFrame& flushedBf, uint64_t flushedPsn) { EXPECT_FALSE(flushedBf.IsDirty()); EXPECT_FALSE(flushedBf.IsFree()); - EXPECT_EQ(flushedGsn, 0); + EXPECT_EQ(flushedPsn, 0); }, testMaxBatchSize); diff --git a/tests/buffer-manager/PageEvictorTest.cpp b/tests/buffer-manager/PageEvictorTest.cpp index 59053172..b1bb6183 100644 --- a/tests/buffer-manager/PageEvictorTest.cpp +++ b/tests/buffer-manager/PageEvictorTest.cpp @@ -20,7 +20,7 @@ class PageEvictorTest : public ::testing::Test { auto curTestName = std::string(curTest->test_case_name()) + "_" + std::string(curTest->name()); const int pageSize = 4096; const int pageHeaderSize = 512; - auto storeDirStr = "/tmp/" + curTestName; + auto storeDirStr = "/tmp/leanstore/" + curTestName; auto* option = CreateStoreOption(storeDirStr.c_str()); option->mCreateFromScratch = true; option->mLogLevel = LogLevel::kDebug; diff --git a/tests/concurrency/WalEntryTest.cpp b/tests/concurrency/WalEntryTest.cpp index 14466308..dfb52963 100644 --- a/tests/concurrency/WalEntryTest.cpp +++ b/tests/concurrency/WalEntryTest.cpp @@ -53,7 +53,7 @@ TEST_F(WalEntryTest, ToJsonString) { EXPECT_TRUE(walStr.contains(kTxId)); EXPECT_TRUE(walStr.contains(kWorkerId)); EXPECT_TRUE(walStr.contains(kPrevLsn)); - EXPECT_TRUE(walStr.contains(kGsn)); + EXPECT_TRUE(walStr.contains(kPsn)); EXPECT_TRUE(walStr.contains(kTreeId)); EXPECT_TRUE(walStr.contains(kPageId)); } diff --git a/tests/sync/ScopedHybridGuardTest.cpp b/tests/sync/ScopedHybridGuardTest.cpp index 2ed07c8d..faec5597 100644 --- a/tests/sync/ScopedHybridGuardTest.cpp +++ b/tests/sync/ScopedHybridGuardTest.cpp @@ -28,7 +28,7 @@ class ScopedHybridGuardTest : public ::testing::Test { ScopedHybridGuardTest() { auto* curTest = ::testing::UnitTest::GetInstance()->current_test_info(); auto curTestName = std::string(curTest->test_case_name()) + "_" + std::string(curTest->name()); - auto* option = CreateStoreOption(("/tmp/" + curTestName).c_str()); + auto* option = CreateStoreOption(("/tmp/leanstore/" + curTestName).c_str()); option->mCreateFromScratch = true; option->mWorkerThreads = 2; option->mEnableEagerGc = true;