From 050dc473c1685aca02d1aee202af83032c840cc2 Mon Sep 17 00:00:00 2001 From: valdok Date: Sun, 19 Dec 2021 14:07:03 +0200 Subject: [PATCH 01/12] Node/Mining: create mining job only if the fee increases --- node/node.cpp | 50 +++++++++++++++++++++++++++++++++++--------------- node/node.h | 2 ++ 2 files changed, 37 insertions(+), 15 deletions(-) diff --git a/node/node.cpp b/node/node.cpp index a7c891201..23b9eb7d0 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -4189,6 +4189,7 @@ void Node::Miner::OnRefresh(uint32_t iIdx) void Node::Miner::HardAbortSafe() { m_pTaskToFinalize.reset(); + m_FeesTrg = 0; std::scoped_lock scope(m_Mutex); @@ -4279,6 +4280,9 @@ bool Node::Miner::Restart() return false; } + if (!IsShouldMine(bc)) + return false; + Task::Ptr pTask(std::make_shared()); Cast::Down(*pTask) = std::move(bc); @@ -4303,34 +4307,50 @@ bool Node::Miner::Restart() return true; } +bool Node::Miner::IsShouldMine(const NodeProcessor::GeneratedBlock& bc) +{ + if (bc.m_Fees >= m_FeesTrg) + return true; + + LOG_INFO() << "Block generation no change"; + return false; +} + void Node::Miner::StartMining(Task::Ptr&& pTask) { assert(pTask && !m_pTaskToFinalize); const NodeProcessor::GeneratedBlock& x = *pTask; + if (!IsShouldMine(x)) + return; + LOG_INFO() << "Block generated: Height=" << x.m_Hdr.m_Height << ", Fee=" << x.m_Fees << ", Difficulty=" << x.m_Hdr.m_PoW.m_Difficulty << ", Size=" << (x.m_BodyP.size() + x.m_BodyE.size()); + m_FeesTrg = x.m_Fees + 1; + pTask->m_hvNonceSeed = get_ParentObj().NextNonce(); // let's mine it. - std::scoped_lock scope(m_Mutex); - - if (m_pTask) { - if (*m_pTask->m_pStop) - return; // block already mined, probably notification to this thread on its way. Ignore the newly-constructed block - pTask->m_pStop = m_pTask->m_pStop; // use the same soft-restart indicator - } - else - { - pTask->m_pStop.reset(new volatile bool); - *pTask->m_pStop = false; - } + std::scoped_lock scope(m_Mutex); + + if (m_pTask) + { + if (*m_pTask->m_pStop) + return; // block already mined, probably notification to this thread on its way. Ignore the newly-constructed block + pTask->m_pStop = m_pTask->m_pStop; // use the same soft-restart indicator + } + else + { + pTask->m_pStop.reset(new volatile bool); + *pTask->m_pStop = false; + } - m_pTask = std::move(pTask); + m_pTask = std::move(pTask); - for (size_t i = 0; i < m_vThreads.size(); i++) - m_vThreads[i].m_pEvt->post(); + for (size_t i = 0; i < m_vThreads.size(); i++) + m_vThreads[i].m_pEvt->post(); + } OnRefreshExternal(); } diff --git a/node/node.h b/node/node.h index 44f097838..155070ac3 100644 --- a/node/node.h +++ b/node/node.h @@ -703,6 +703,7 @@ struct Node void OnMined(); IExternalPOW::BlockFoundResult OnMinedExternal(); void OnFinalizerChanged(Peer*); + bool IsShouldMine(const NodeProcessor::GeneratedBlock&); void HardAbortSafe(); bool Restart(); @@ -726,6 +727,7 @@ struct Node io::Timer::Ptr m_pTimer; bool m_bTimerPending = false; + Amount m_FeesTrg = 0; void OnTimer(); void SetTimer(uint32_t timeout_ms, bool bHard); From 943456cbe2c305f14c941890e9935820b4cc677b Mon Sep 17 00:00:00 2001 From: valdok Date: Mon, 20 Dec 2021 00:05:19 +0200 Subject: [PATCH 02/12] Node: TxPool improvement, WIP --- node/node.cpp | 86 +++++++++++++++++++----------------- node/node.h | 6 +-- node/processor.cpp | 12 ++--- node/txpool.cpp | 49 ++++++++++---------- node/txpool.h | 31 ++++++------- node/unittests/node_test.cpp | 5 ++- 6 files changed, 97 insertions(+), 92 deletions(-) diff --git a/node/node.cpp b/node/node.cpp index 23b9eb7d0..ce5ac45eb 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -577,7 +577,7 @@ void Node::DeleteOutdated() Transaction& tx = *x.m_pValue; uint32_t nBvmCharge = 0; - if (proto::TxStatus::Ok != m_Processor.ValidateTxContextEx(tx, x.m_Height, true, nBvmCharge, nullptr, nullptr)) + if (proto::TxStatus::Ok != m_Processor.ValidateTxContextEx(tx, x.m_Profit.m_Stats.m_Hr, true, nBvmCharge, nullptr, nullptr)) m_TxPool.SetOutdated(x, m_Processor.m_Cursor.m_ID.m_Height); } @@ -587,10 +587,10 @@ void Node::DeleteOutdated() assert(MaxHeight == x.m_Confirm.m_Height); uint32_t nBvmCharge = 0; - uint8_t nStatus = m_Processor.ValidateTxContextEx(*x.m_pValue, x.m_Height, true, nBvmCharge, nullptr, nullptr); + uint8_t nStatus = m_Processor.ValidateTxContextEx(*x.m_pValue, x.m_Profit.m_Stats.m_Hr, true, nBvmCharge, nullptr, nullptr); if (proto::TxStatus::Ok != nStatus) { - bool bDone = x.m_Height.IsInRange(m_Processor.m_Cursor.m_ID.m_Height + 1); + bool bDone = x.m_Profit.m_Stats.m_Hr.IsInRange(m_Processor.m_Cursor.m_ID.m_Height + 1); LogTxStem(*x.m_pValue, bDone ? "confirmed without fluff" : "outdated"); m_Dandelion.Delete(x); @@ -611,7 +611,7 @@ void Node::DeleteOutdated() assert(!x.m_Time.m_Value); uint32_t nBvmCharge = 0; - uint8_t nStatus = m_Processor.ValidateTxContextEx(*x.m_pValue, x.m_Height, true, nBvmCharge, nullptr, nullptr); + uint8_t nStatus = m_Processor.ValidateTxContextEx(*x.m_pValue, x.m_Stats.m_Hr, true, nBvmCharge, nullptr, nullptr); if (proto::TxStatus::Ok == nStatus) { LogTxStem(*x.m_pValue, "Not confirmed, fluffing"); @@ -619,7 +619,7 @@ void Node::DeleteOutdated() } else { - bool bDone = x.m_Height.IsInRange(m_Processor.m_Cursor.m_ID.m_Height + 1); + bool bDone = x.m_Profit.m_Stats.m_Hr.IsInRange(m_Processor.m_Cursor.m_ID.m_Height + 1); LogTxStem(*x.m_pValue, bDone ? "confirm done" : "outdated"); m_Dandelion.Delete(x); @@ -2234,13 +2234,23 @@ uint8_t Node::OnTransaction(Transaction::Ptr&& pTx, std::unique_ptrm_vKernels.size(); i++) @@ -2453,7 +2460,7 @@ uint8_t Node::OnTransactionStem(Transaction::Ptr&& ptx, std::ostream* pExtraInfo if (!bTested) { - uint8_t nCode = ValidateTx(ctx, *ptx, nSizeCorrection, feeReserve, pExtraInfo); + uint8_t nCode = ValidateTx(stats, *ptx, pExtraInfo); if (proto::TxStatus::Ok != nCode) return nCode; @@ -2468,21 +2475,18 @@ uint8_t Node::OnTransactionStem(Transaction::Ptr&& ptx, std::ostream* pExtraInfo { if (!bTested) { - uint8_t nCode = ValidateTx(ctx, *ptx, nSizeCorrection, feeReserve, pExtraInfo); + uint8_t nCode = ValidateTx(stats, *ptx, pExtraInfo); if (proto::TxStatus::Ok != nCode) return nCode; } - AddDummyInputs(*ptx); + AddDummyInputs(*ptx, stats); auto pGuard = std::make_unique(); pGuard->m_bAggregating = false; pGuard->m_Time.m_Value = 0; - pGuard->m_Profit.m_Fee = ctx.m_Stats.m_Fee; - pGuard->m_Profit.SetSize(*ptx, nSizeCorrection); + pGuard->m_Profit.m_Stats = stats; pGuard->m_pValue.swap(ptx); - pGuard->m_Height = ctx.m_Height; - pGuard->m_FeeReserve = feeReserve; m_Dandelion.InsertKrn(*pGuard); @@ -2562,6 +2566,7 @@ void Node::PerformAggregation(TxPool::Stem::Element& x) { assert(x.m_bAggregating); + bool bModified = false; // Aggregation policiy: first select those with worse profit, than those with better TxPool::Stem::ProfitSet::iterator it = TxPool::Stem::ProfitSet::s_iterator_to(x.m_Profit); ++it; @@ -2574,7 +2579,8 @@ void Node::PerformAggregation(TxPool::Stem::Element& x) TxPool::Stem::Element& src = it->get_ParentObj(); ++it; - m_Dandelion.TryMerge(x, src); + if (m_Dandelion.TryMerge(x, src)) + bModified = true; } it = TxPool::Stem::ProfitSet::s_iterator_to(x.m_Profit); @@ -2589,13 +2595,20 @@ void Node::PerformAggregation(TxPool::Stem::Element& x) if (!bEnd) --it; - m_Dandelion.TryMerge(x, src); + if (m_Dandelion.TryMerge(x, src)) + bModified = true; if (bEnd) break; } } + if (bModified) + { + m_Dandelion.DeleteAggr(x); + m_Dandelion.InsertAggr(x); + } + LogTxStem(*x.m_pValue, "Aggregated so far"); if (x.m_pValue->m_vOutputs.size() >= m_Cfg.m_Dandelion.m_OutputsMin) @@ -2607,7 +2620,7 @@ void Node::PerformAggregation(TxPool::Stem::Element& x) } } -void Node::AddDummyInputs(Transaction& tx) +void Node::AddDummyInputs(Transaction& tx, TxPool::Stats& stats) { if (!m_Keys.m_pMiner) return; @@ -2638,6 +2651,7 @@ void Node::AddDummyInputs(Transaction& tx) if (bModified) { + stats.SetSize(tx); m_Processor.FlushDB(); // make sure they're not lost tx.Normalize(); } @@ -2696,7 +2710,7 @@ bool Node::AddDummyInputRaw(Transaction& tx, const CoinID& cid) return true; } -void Node::AddDummyOutputs(Transaction& tx, Amount feeReserve) +void Node::AddDummyOutputs(Transaction& tx, TxPool::Stats& stats) { if (!m_Cfg.m_Dandelion.m_DummyLifetimeHi || !m_Keys.m_pMiner) return; @@ -2708,7 +2722,7 @@ void Node::AddDummyOutputs(Transaction& tx, Amount feeReserve) while (tx.m_vOutputs.size() < m_Cfg.m_Dandelion.m_OutputsMin) { - if (feeReserve < fs.m_Output) + if (stats.m_FeeReserve < fs.m_Output) break; CoinID cid(Zero); @@ -2736,11 +2750,12 @@ void Node::AddDummyOutputs(Transaction& tx, Amount feeReserve) sk = -sk; tx.m_Offset = ECC::Scalar::Native(tx.m_Offset) + sk; - feeReserve -= fs.m_Output; + stats.m_FeeReserve -= fs.m_Output; } if (bModified) { + stats.SetSize(tx); m_Processor.FlushDB(); tx.Normalize(); } @@ -2775,14 +2790,10 @@ uint8_t Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, std::ostream* pExtra Transaction::Ptr ptx; ptx.swap(ptxArg); - Transaction::Context::Params pars; - Transaction::Context ctx(pars); + TxPool::Stats stats; if (pElem) { - bool bValid = pElem->m_Height.IsInRange(m_Processor.m_Cursor.m_ID.m_Height + 1); - - ctx.m_Stats.m_Fee = pElem->m_Profit.m_Fee; - ctx.m_Height = pElem->m_Height; + stats = pElem->m_Profit.m_Stats; if (MaxHeight == pElem->m_Confirm.m_Height) { @@ -2794,9 +2805,6 @@ uint8_t Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, std::ostream* pExtra else // fluff from m_Dandelion.Delete(*pElem); - - if (!bValid) - return proto::TxStatus::InvalidContext; } TxPool::Fluff::Element::Tx key; @@ -2809,9 +2817,7 @@ uint8_t Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, std::ostream* pExtra const Transaction& tx = *ptx; // new transaction - uint32_t nSizeCorrection = 0; - Amount feeReserve = 0; - uint8_t nCode = pElem ? proto::TxStatus::Ok : ValidateTx(ctx, tx, nSizeCorrection, feeReserve, pExtraInfo); + uint8_t nCode = pElem ? proto::TxStatus::Ok : ValidateTx(stats, tx, pExtraInfo); LogTx(tx, nCode, key.m_Key); if (proto::TxStatus::Ok != nCode) { @@ -2834,7 +2840,7 @@ uint8_t Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, std::ostream* pExtra } - TxPool::Fluff::Element* pNewTxElem = m_TxPool.AddValidTx(std::move(ptx), ctx, key.m_Key, nSizeCorrection); + TxPool::Fluff::Element* pNewTxElem = m_TxPool.AddValidTx(std::move(ptx), stats, key.m_Key); while (m_TxPool.m_setProfit.size() + m_TxPool.m_setOutdated.size() > m_Cfg.m_MaxPoolTransactions) { @@ -2922,7 +2928,7 @@ void Node::Dandelion::OnTimedOut(Element& x) { if (x.m_bAggregating) { - get_ParentObj().AddDummyOutputs(*x.m_pValue, x.m_FeeReserve); + get_ParentObj().AddDummyOutputs(*x.m_pValue, x.m_Profit.m_Stats); get_ParentObj().LogTxStem(*x.m_pValue, "Aggregation timed-out, dummies added"); get_ParentObj().OnTransactionAggregated(x); } @@ -3059,7 +3065,7 @@ void Node::Peer::BroadcastTxs() msgOut.m_ID = m_pCursorTx->m_Tx.m_Key; Send(msgOut); - nExtra += m_pCursorTx->m_Profit.m_nSize; + nExtra += m_pCursorTx->m_Profit.m_Stats.m_Size; if (IsChocking(nExtra)) break; } diff --git a/node/node.h b/node/node.h index 155070ac3..9e8cb1ef5 100644 --- a/node/node.h +++ b/node/node.h @@ -395,14 +395,14 @@ struct Node void OnTransactionAggregated(Dandelion::Element&); void OnTransactionWaitingConfirm(TxPool::Stem::Element&); void PerformAggregation(Dandelion::Element&); - void AddDummyInputs(Transaction&); + void AddDummyInputs(Transaction&, TxPool::Stats&); bool AddDummyInputRaw(Transaction& tx, const CoinID&); bool AddDummyInputEx(Transaction& tx, const CoinID&); - void AddDummyOutputs(Transaction&, Amount feeReserve); + void AddDummyOutputs(Transaction&, TxPool::Stats&); Height SampleDummySpentHeight(); void DeleteOutdated(); - uint8_t ValidateTx(Transaction::Context&, const Transaction&, uint32_t& nSizeCorrection, Amount& feeReserve, std::ostream* pExtraInfo); // complete validation + uint8_t ValidateTx(TxPool::Stats&, const Transaction&, std::ostream* pExtraInfo); // complete validation uint8_t ValidateTx2(Transaction::Context&, const Transaction&, uint32_t& nBvmCharge, Amount& feeReserve, TxPool::Dependent::Element* pParent, std::ostream* pExtraInfo); // complete validation static bool CalculateFeeReserve(const TxStats&, const HeightRange&, const AmountBig::Type&, uint32_t nBvmCharge, Amount& feeReserve); void LogTx(const Transaction&, uint8_t nStatus, const Transaction::KeyType&); diff --git a/node/processor.cpp b/node/processor.cpp index aa28e82b6..e7eafc02d 100644 --- a/node/processor.cpp +++ b/node/processor.cpp @@ -6029,18 +6029,12 @@ size_t NodeProcessor::GenerateNewBlockInternal(BlockContext& bc, BlockInterpretC { TxPool::Fluff::Element& x = (it++)->get_ParentObj(); - if (AmountBig::get_Hi(x.m_Profit.m_Fee)) - { - // huge fees are unsupported - bc.m_TxPool.Delete(x); - continue; - } - Amount feesNext = bc.m_Fees + AmountBig::get_Lo(x.m_Profit.m_Fee); + Amount feesNext = bc.m_Fees + x.m_Profit.m_Stats.m_Fee; if (feesNext < bc.m_Fees) continue; // huge fees are unsupported - size_t nSizeNext = ssc.m_Counter.m_Value + x.m_Profit.m_nSize; + size_t nSizeNext = ssc.m_Counter.m_Value + x.m_Profit.m_Stats.m_Size; if (!bc.m_Fees && feesNext) nSizeNext += m_nSizeUtxoComission; @@ -6059,7 +6053,7 @@ size_t NodeProcessor::GenerateNewBlockInternal(BlockContext& bc, BlockInterpretC Transaction& tx = *x.m_pValue; - bool bDelete = !x.m_Height.IsInRange(bic.m_Height); + bool bDelete = !x.m_Profit.m_Stats.m_Hr.IsInRange(bic.m_Height); if (!bDelete) { assert(!bic.m_LimitExceeded); diff --git a/node/txpool.cpp b/node/txpool.cpp index d135edeaa..2c00991cd 100644 --- a/node/txpool.cpp +++ b/node/txpool.cpp @@ -25,40 +25,40 @@ void save_VecPtr(Archive& ar, const std::vector& v) ar & *v[i]; } -void TxPool::Profit::SetSize(const Transaction& tx, uint32_t nCorrection) +void TxPool::Stats::SetSize(const Transaction& tx) { - m_nSize = (uint32_t) tx.get_Reader().get_SizeNetto(); - m_nSizeCorrected = m_nSize + nCorrection; + m_Size = (uint32_t)tx.get_Reader().get_SizeNetto(); } -uint32_t TxPool::Profit::get_Correction() const +void TxPool::Stats::From(const Transaction& tx, const Transaction::Context& ctx, Amount feeReserve, uint32_t nSizeCorrection) { - uint32_t ret; - m_nSizeCorrected.Export(ret); - return ret - m_nSize; + assert(!AmountBig::get_Hi(ctx.m_Stats.m_Fee)); // ignore such txs atm + m_Fee = AmountBig::get_Lo(ctx.m_Stats.m_Fee); + m_FeeReserve = feeReserve; + m_Hr = ctx.m_Height; + + SetSize(tx); + m_SizeCorrection = nSizeCorrection; } bool TxPool::Profit::operator < (const Profit& t) const { // handle overflow. To be precise need to use big-int (96-bit) arithmetics - // return m_Fee * t.m_nSize > t.m_Fee * m_nSize; return - (m_Fee * t.m_nSizeCorrected) > - (t.m_Fee * m_nSizeCorrected); + (uintBigFrom(m_Stats.m_Fee) * uintBigFrom(t.m_Stats.m_Size + t.m_Stats.m_SizeCorrection)) > + (uintBigFrom(t.m_Stats.m_Fee) * uintBigFrom(m_Stats.m_Size + m_Stats.m_SizeCorrection)); } ///////////////////////////// // Fluff -TxPool::Fluff::Element* TxPool::Fluff::AddValidTx(Transaction::Ptr&& pValue, const Transaction::Context& ctx, const Transaction::KeyType& key, uint32_t nSizeCorrection) +TxPool::Fluff::Element* TxPool::Fluff::AddValidTx(Transaction::Ptr&& pValue, const Stats& stats, const Transaction::KeyType& key) { assert(pValue); Element* p = new Element; p->m_pValue = std::move(pValue); - p->m_Height = ctx.m_Height; - p->m_Profit.m_Fee = ctx.m_Stats.m_Fee; - p->m_Profit.SetSize(*p->m_pValue, nSizeCorrection); + p->m_Profit.m_Stats = stats; p->m_Tx.m_Key = key; p->m_Outdated.m_Height = MaxHeight; assert(!p->IsOutdated()); @@ -140,8 +140,8 @@ bool TxPool::Stem::TryMerge(Element& trg, Element& src) { assert(trg.m_bAggregating && src.m_bAggregating); - HeightRange hr = trg.m_Height; - hr.Intersect(src.m_Height); + HeightRange hr = trg.m_Profit.m_Stats.m_Hr; + hr.Intersect(src.m_Profit.m_Stats.m_Hr); if (hr.IsEmpty()) return false; @@ -160,25 +160,26 @@ bool TxPool::Stem::TryMerge(Element& trg, Element& src) // assert(txNew.IsValid(ctx)); //#endif // _DEBUG - auto fees = trg.m_Profit.m_Fee; - fees += src.m_Profit.m_Fee; + auto fees = trg.m_Profit.m_Stats.m_Fee; + fees += src.m_Profit.m_Stats.m_Fee; Amount feeReserve = 0; if (!ValidateTxContext(txNew, hr, fees, feeReserve)) return false; // conflicting txs, can't merge - trg.m_Profit.m_Fee += fees; - trg.m_Profit.SetSize(txNew, trg.m_Profit.get_Correction() + src.m_Profit.get_Correction()); - Delete(src); - DeleteKrn(trg); + trg.m_Profit.m_Stats.m_Fee += src.m_Profit.m_Stats.m_Fee; + trg.m_Profit.m_Stats.m_FeeReserve = feeReserve; + trg.m_Profit.m_Stats.m_Hr = hr; + trg.m_Profit.m_Stats.SetSize(txNew); + trg.m_Profit.m_Stats.m_SizeCorrection += src.m_Profit.m_Stats.m_SizeCorrection; trg.m_pValue->m_vInputs.swap(txNew.m_vInputs); trg.m_pValue->m_vOutputs.swap(txNew.m_vOutputs); trg.m_pValue->m_vKernels.swap(txNew.m_vKernels); trg.m_pValue->m_Offset = txNew.m_Offset; - trg.m_FeeReserve = feeReserve; - trg.m_Height = hr; + Delete(src); + DeleteKrn(trg); InsertKrn(trg); return true; diff --git a/node/txpool.h b/node/txpool.h index 7bf9972d4..29ac506b3 100644 --- a/node/txpool.h +++ b/node/txpool.h @@ -22,15 +22,22 @@ namespace beam { struct TxPool { + struct Stats + { + Amount m_Fee; + Amount m_FeeReserve; + uint32_t m_Size; + uint32_t m_SizeCorrection; + HeightRange m_Hr; + + void From(const Transaction&, const Transaction::Context&, Amount feeReserve, uint32_t nSizeCorrection); + void SetSize(const Transaction&); + }; + struct Profit :public boost::intrusive::set_base_hook<> { - AmountBig::Type m_Fee; // since a tx may include multiple kernels - theoretically fee may be huge (though highly unlikely) - uint32_t m_nSize; - uintBigFor::Type m_nSizeCorrected; - - void SetSize(const Transaction&, uint32_t nCorrection); - uint32_t get_Correction() const; + Stats m_Stats; bool operator < (const Profit& t) const; }; @@ -42,11 +49,8 @@ struct TxPool Transaction::Ptr m_pValue; struct Tx - :public boost::intrusive::set_base_hook<> + :public intrusive::set_base_hook { - Transaction::KeyType m_Key; - - bool operator < (const Tx& t) const { return m_Key < t.m_Key; } IMPLEMENT_GET_PARENT_OBJ(Element, m_Tx) } m_Tx; @@ -56,8 +60,6 @@ struct TxPool IMPLEMENT_GET_PARENT_OBJ(Element, m_Profit) } m_Profit; - HeightRange m_Height; - struct Outdated :public boost::intrusive::set_base_hook<> { @@ -87,7 +89,7 @@ struct TxPool OutdatedSet m_setOutdated; Queue m_Queue; - Element* AddValidTx(Transaction::Ptr&&, const Transaction::Context&, const Transaction::KeyType&, uint32_t nSizeCorrection); + Element* AddValidTx(Transaction::Ptr&&, const Stats&, const Transaction::KeyType&); void SetOutdated(Element&, Height); void Delete(Element&); void DeleteEmpty(Element&); @@ -140,8 +142,7 @@ struct TxPool IMPLEMENT_GET_PARENT_OBJ(Element, m_Confirm) } m_Confirm; - HeightRange m_Height; - Amount m_FeeReserve; + Stats m_Stats; std::vector m_vKrn; }; diff --git a/node/unittests/node_test.cpp b/node/unittests/node_test.cpp index 46457a5cf..e60760858 100644 --- a/node/unittests/node_test.cpp +++ b/node/unittests/node_test.cpp @@ -1071,7 +1071,10 @@ namespace beam Transaction::KeyType key; pTx->get_Key(key); - np.m_TxPool.AddValidTx(std::move(pTx), ctx, key, 0); + TxPool::Stats stats; + stats.From(*pTx, ctx, 0, 0); + + np.m_TxPool.AddValidTx(std::move(pTx), stats, key); } NodeProcessor::BlockContext bc(np.m_TxPool, 0, *np.m_Wallet.m_pKdf, *np.m_Wallet.m_pKdf); From 349c2f5e9e6ba28d67254ce0e95d94e172f22384 Mon Sep 17 00:00:00 2001 From: valdok Date: Mon, 20 Dec 2021 00:26:52 +0200 Subject: [PATCH 03/12] Node: TxPool improvement, WIP (2) --- node/node.cpp | 14 +++++++------- node/txpool.cpp | 11 +++++++---- node/txpool.h | 6 +++--- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/node/node.cpp b/node/node.cpp index ce5ac45eb..a4103ef27 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -561,9 +561,9 @@ void Node::DeleteOutdated() { h = m_Processor.m_Cursor.m_ID.m_Height - h; - while (!m_TxPool.m_setOutdated.empty()) + while (!m_TxPool.m_lstOutdated.empty()) { - TxPool::Fluff::Element& x = m_TxPool.m_setOutdated.begin()->get_ParentObj(); + TxPool::Fluff::Element& x = m_TxPool.m_lstOutdated.front().get_ParentObj(); if (x.m_Outdated.m_Height > h) break; @@ -752,9 +752,9 @@ void Node::Processor::OnRolledBack() LOG_INFO() << "Rolled back to: " << m_Cursor.m_ID; TxPool::Fluff& txp = get_ParentObj().m_TxPool; - while (!txp.m_setOutdated.empty()) + while (!txp.m_lstOutdated.empty()) { - TxPool::Fluff::Element& x = txp.m_setOutdated.rbegin()->get_ParentObj(); + TxPool::Fluff::Element& x = txp.m_lstOutdated.back().get_ParentObj(); if (x.m_Outdated.m_Height <= m_Cursor.m_ID.m_Height) break; @@ -2842,11 +2842,11 @@ uint8_t Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, std::ostream* pExtra TxPool::Fluff::Element* pNewTxElem = m_TxPool.AddValidTx(std::move(ptx), stats, key.m_Key); - while (m_TxPool.m_setProfit.size() + m_TxPool.m_setOutdated.size() > m_Cfg.m_MaxPoolTransactions) + while (m_TxPool.m_setProfit.size() + m_TxPool.m_lstOutdated.size() > m_Cfg.m_MaxPoolTransactions) { - TxPool::Fluff::Element& txDel = m_TxPool.m_setOutdated.empty() ? + TxPool::Fluff::Element& txDel = m_TxPool.m_lstOutdated.empty() ? m_TxPool.m_setProfit.rbegin()->get_ParentObj() : - m_TxPool.m_setOutdated.begin()->get_ParentObj(); + m_TxPool.m_lstOutdated.front().get_ParentObj(); if (&txDel == pNewTxElem) pNewTxElem = nullptr; // Anti-spam protection: in case the maximum pool capacity is reached - ensure this tx is any better BEFORE broadcasting ti diff --git a/node/txpool.cpp b/node/txpool.cpp index 2c00991cd..c99910da5 100644 --- a/node/txpool.cpp +++ b/node/txpool.cpp @@ -81,7 +81,10 @@ void TxPool::Fluff::SetOutdated(Element& x, Height h) void TxPool::Fluff::InternalInsert(Element& x) { if (x.IsOutdated()) - m_setOutdated.insert(x.m_Outdated); + { + assert(m_lstOutdated.empty() || (m_lstOutdated.back().m_Height <= x.m_Outdated.m_Height)); // order must be preserved + m_lstOutdated.push_back(x.m_Outdated); + } else { m_setTxs.insert(x.m_Tx); @@ -92,7 +95,7 @@ void TxPool::Fluff::InternalInsert(Element& x) void TxPool::Fluff::InternalErase(Element& x) { if (x.IsOutdated()) - m_setOutdated.erase(OutdatedSet::s_iterator_to(x.m_Outdated)); + m_lstOutdated.erase(OutdatedList::s_iterator_to(x.m_Outdated)); else { m_setTxs.erase(TxSet::s_iterator_to(x.m_Tx)); @@ -130,8 +133,8 @@ void TxPool::Fluff::Clear() while (!m_setProfit.empty()) Delete(m_setProfit.begin()->get_ParentObj()); - while (!m_setOutdated.empty()) - Delete(m_setOutdated.begin()->get_ParentObj()); + while (!m_lstOutdated.empty()) + Delete(m_lstOutdated.begin()->get_ParentObj()); } ///////////////////////////// diff --git a/node/txpool.h b/node/txpool.h index 29ac506b3..a75203757 100644 --- a/node/txpool.h +++ b/node/txpool.h @@ -61,7 +61,7 @@ struct TxPool } m_Profit; struct Outdated - :public boost::intrusive::set_base_hook<> + :public boost::intrusive::list_base_hook<> { Height m_Height; @@ -81,12 +81,12 @@ struct TxPool typedef boost::intrusive::multiset TxSet; typedef boost::intrusive::multiset ProfitSet; - typedef boost::intrusive::multiset OutdatedSet; + typedef boost::intrusive::list OutdatedList; typedef boost::intrusive::list Queue; TxSet m_setTxs; ProfitSet m_setProfit; - OutdatedSet m_setOutdated; + OutdatedList m_lstOutdated; Queue m_Queue; Element* AddValidTx(Transaction::Ptr&&, const Stats&, const Transaction::KeyType&); From 6f30114fe1b51c20430b1e352f27bb45ab6f0c07 Mon Sep 17 00:00:00 2001 From: valdok Date: Mon, 20 Dec 2021 02:11:38 +0200 Subject: [PATCH 04/12] Node: TxPool improvement, WIP (3) --- node/node.cpp | 25 +++++++++++++------------ node/node.h | 4 ++-- node/txpool.cpp | 49 +++++++++++++++++++++++++++++++++---------------- node/txpool.h | 15 ++++++++------- 4 files changed, 56 insertions(+), 37 deletions(-) diff --git a/node/node.cpp b/node/node.cpp index a4103ef27..defbcbbc5 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -768,7 +768,7 @@ void Node::Processor::OnRolledBack() if (!IsShieldedInPool(*x.m_pValue)) { get_ParentObj().OnTransactionDeferred(std::move(x.m_pValue), nullptr, nullptr, true); - get_ParentObj().m_TxPool.DeleteEmpty(x); + get_ParentObj().m_TxPool.Delete(x); } } @@ -2869,7 +2869,7 @@ uint8_t Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, std::ostream* pExtra continue; peer.Send(msgOut); - peer.SetTxCursor(pNewTxElem); + peer.SetTxCursor(pNewTxElem->m_pSend); } if (m_Miner.IsEnabled() && !m_Miner.m_pTaskToFinalize) @@ -3021,7 +3021,7 @@ void Node::Peer::OnChocking() } } -void Node::Peer::SetTxCursor(TxPool::Fluff::Element* p) +void Node::Peer::SetTxCursor(TxPool::Fluff::Element::Send* p) { if (m_pCursorTx) { @@ -3031,7 +3031,7 @@ void Node::Peer::SetTxCursor(TxPool::Fluff::Element* p) m_pCursorTx = p; if (m_pCursorTx) - m_pCursorTx->m_Queue.m_Refs++; + m_pCursorTx->m_Refs++; } void Node::Peer::BroadcastTxs() @@ -3044,28 +3044,29 @@ void Node::Peer::BroadcastTxs() for (size_t nExtra = 0; ; ) { - TxPool::Fluff::Queue::iterator itNext; + TxPool::Fluff::SendQueue::iterator itNext; if (m_pCursorTx) { - itNext = TxPool::Fluff::Queue::s_iterator_to(m_pCursorTx->m_Queue); + itNext = TxPool::Fluff::SendQueue::s_iterator_to(*m_pCursorTx); ++itNext; } else - itNext = m_This.m_TxPool.m_Queue.begin(); + itNext = m_This.m_TxPool.m_SendQueue.begin(); - if (m_This.m_TxPool.m_Queue.end() == itNext) + if (m_This.m_TxPool.m_SendQueue.end() == itNext) break; // all sent - SetTxCursor(&itNext->get_ParentObj()); + SetTxCursor(&(*itNext)); - if (!m_pCursorTx->m_pValue || m_pCursorTx->IsOutdated()) + if (!m_pCursorTx->m_pThis) continue; // already deleted + auto& x = *m_pCursorTx->m_pThis; proto::HaveTransaction msgOut; - msgOut.m_ID = m_pCursorTx->m_Tx.m_Key; + msgOut.m_ID = x.m_Tx.m_Key; Send(msgOut); - nExtra += m_pCursorTx->m_Profit.m_Stats.m_Size; + nExtra += x.m_Profit.m_Stats.m_Size; if (IsChocking(nExtra)) break; } diff --git a/node/node.h b/node/node.h index 9e8cb1ef5..0456b76b0 100644 --- a/node/node.h +++ b/node/node.h @@ -525,7 +525,7 @@ struct Node std::unique_ptr n_pDependentContext; uint64_t m_CursorBbs; - TxPool::Fluff::Element* m_pCursorTx; + TxPool::Fluff::Element::Send* m_pCursorTx; TaskList m_lstTasks; std::set m_setRejected; // data that shouldn't be requested from this peer. Reset after reconnection or on receiving NewTip @@ -552,7 +552,7 @@ struct Node void BroadcastBbs(Bbs::Subscription&); void MaybeSendSerif(); void OnChocking(); - void SetTxCursor(TxPool::Fluff::Element*); + void SetTxCursor(TxPool::Fluff::Element::Send*); bool GetBlock(proto::BodyBuffers&, const NodeDB::StateID&, const proto::GetBodyPack&, bool bActive); bool IsChocking(size_t nExtra = 0); diff --git a/node/txpool.cpp b/node/txpool.cpp index c99910da5..12483b3b7 100644 --- a/node/txpool.cpp +++ b/node/txpool.cpp @@ -65,17 +65,40 @@ TxPool::Fluff::Element* TxPool::Fluff::AddValidTx(Transaction::Ptr&& pValue, con InternalInsert(*p); - p->m_Queue.m_Refs = 1; - m_Queue.push_back(p->m_Queue); + EnsureSend(*p, true); return p; } +void TxPool::Fluff::EnsureSend(Element& x, bool b) +{ + if (b == !!x.m_pSend) + return; // no change + + if (x.m_pSend) + { + assert(x.m_pSend && (&x == x.m_pSend->m_pThis)); + + x.m_pSend->m_pThis = nullptr; + Release(*x.m_pSend); + x.m_pSend = nullptr; + } + else + { + x.m_pSend = new Element::Send; + x.m_pSend->m_Refs = 1; + x.m_pSend->m_pThis = &x; + m_SendQueue.push_back(*x.m_pSend); + } +} + void TxPool::Fluff::SetOutdated(Element& x, Height h) { InternalErase(x); x.m_Outdated.m_Height = h; InternalInsert(x); + + EnsureSend(x, !x.IsOutdated()); } void TxPool::Fluff::InternalInsert(Element& x) @@ -105,25 +128,19 @@ void TxPool::Fluff::InternalErase(Element& x) void TxPool::Fluff::Delete(Element& x) { - assert(x.m_pValue); - x.m_pValue.reset(); - DeleteEmpty(x); -} - -void TxPool::Fluff::DeleteEmpty(Element& x) -{ - assert(!x.m_pValue); InternalErase(x); - Release(x); + EnsureSend(x, false); + + delete &x; } -void TxPool::Fluff::Release(Element& x) +void TxPool::Fluff::Release(Element::Send& x) { - assert(x.m_Queue.m_Refs); - if (!--x.m_Queue.m_Refs) + assert(x.m_Refs); + if (!--x.m_Refs) { - assert(!x.m_pValue); - m_Queue.erase(Queue::s_iterator_to(x.m_Queue)); + assert(!x.m_pThis); + m_SendQueue.erase(SendQueue::s_iterator_to(x)); delete &x; } } diff --git a/node/txpool.h b/node/txpool.h index a75203757..417ce28a7 100644 --- a/node/txpool.h +++ b/node/txpool.h @@ -69,12 +69,13 @@ struct TxPool IMPLEMENT_GET_PARENT_OBJ(Element, m_Outdated) } m_Outdated; - struct Queue + struct Send :public boost::intrusive::list_base_hook<> { + Element* m_pThis; uint32_t m_Refs = 0; - IMPLEMENT_GET_PARENT_OBJ(Element, m_Queue) - } m_Queue; + }; + Send* m_pSend = nullptr; bool IsOutdated() const { return MaxHeight != m_Outdated.m_Height; } }; @@ -82,18 +83,17 @@ struct TxPool typedef boost::intrusive::multiset TxSet; typedef boost::intrusive::multiset ProfitSet; typedef boost::intrusive::list OutdatedList; - typedef boost::intrusive::list Queue; + typedef boost::intrusive::list SendQueue; TxSet m_setTxs; ProfitSet m_setProfit; OutdatedList m_lstOutdated; - Queue m_Queue; + SendQueue m_SendQueue; Element* AddValidTx(Transaction::Ptr&&, const Stats&, const Transaction::KeyType&); void SetOutdated(Element&, Height); void Delete(Element&); - void DeleteEmpty(Element&); - void Release(Element&); + void Release(Element::Send&); void Clear(); ~Fluff() { Clear(); } @@ -101,6 +101,7 @@ struct TxPool private: void InternalInsert(Element&); void InternalErase(Element&); + void EnsureSend(Element&, bool); }; struct Stem From 2fb6191fb9ea8da77402dee8b9e3d9e1afa41c40 Mon Sep 17 00:00:00 2001 From: valdok Date: Mon, 20 Dec 2021 02:59:39 +0200 Subject: [PATCH 05/12] Node: TxPool improvement, WIP (4) --- node/node.cpp | 15 +++-- node/processor.cpp | 5 +- node/txpool.cpp | 112 ++++++++++++++++++++++------------- node/txpool.h | 43 +++++++++----- node/unittests/node_test.cpp | 2 +- 5 files changed, 113 insertions(+), 64 deletions(-) diff --git a/node/node.cpp b/node/node.cpp index defbcbbc5..c16a8759e 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -564,7 +564,7 @@ void Node::DeleteOutdated() while (!m_TxPool.m_lstOutdated.empty()) { TxPool::Fluff::Element& x = m_TxPool.m_lstOutdated.front().get_ParentObj(); - if (x.m_Outdated.m_Height > h) + if (x.m_Hist.m_Height > h) break; m_TxPool.Delete(x); @@ -577,8 +577,11 @@ void Node::DeleteOutdated() Transaction& tx = *x.m_pValue; uint32_t nBvmCharge = 0; - if (proto::TxStatus::Ok != m_Processor.ValidateTxContextEx(tx, x.m_Profit.m_Stats.m_Hr, true, nBvmCharge, nullptr, nullptr)) - m_TxPool.SetOutdated(x, m_Processor.m_Cursor.m_ID.m_Height); + if (proto::TxStatus::Ok != m_Processor.ValidateTxContextEx(tx, x.m_Profit.m_Stats.m_Hr, true, nBvmCharge, nullptr, nullptr)) + { + x.m_Hist.m_Height = m_Processor.m_Cursor.m_ID.m_Height; + m_TxPool.SetState(x, TxPool::Fluff::State::Outdated); + } } for (TxPool::Stem::TimeSet::iterator it = m_Dandelion.m_setTime.begin(); m_Dandelion.m_setTime.end() != it; ) @@ -755,10 +758,10 @@ void Node::Processor::OnRolledBack() while (!txp.m_lstOutdated.empty()) { TxPool::Fluff::Element& x = txp.m_lstOutdated.back().get_ParentObj(); - if (x.m_Outdated.m_Height <= m_Cursor.m_ID.m_Height) + if (x.m_Hist.m_Height <= m_Cursor.m_ID.m_Height) break; - txp.SetOutdated(x, MaxHeight); // may be deferred by the next loop + txp.SetState(x, TxPool::Fluff::State::Fluffed); // may be deferred by the next loop } // Shielded txs that referenced shielded outputs which were reverted - must be reprocessed @@ -2840,7 +2843,7 @@ uint8_t Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, std::ostream* pExtra } - TxPool::Fluff::Element* pNewTxElem = m_TxPool.AddValidTx(std::move(ptx), stats, key.m_Key); + TxPool::Fluff::Element* pNewTxElem = m_TxPool.AddValidTx(std::move(ptx), stats, key.m_Key, TxPool::Fluff::State::Fluffed); while (m_TxPool.m_setProfit.size() + m_TxPool.m_lstOutdated.size() > m_Cfg.m_MaxPoolTransactions) { diff --git a/node/processor.cpp b/node/processor.cpp index e7eafc02d..1f05a2344 100644 --- a/node/processor.cpp +++ b/node/processor.cpp @@ -6076,7 +6076,10 @@ size_t NodeProcessor::GenerateNewBlockInternal(BlockContext& bc, BlockInterpretC } if (bDelete) - bc.m_TxPool.SetOutdated(x, h); // isn't available in this context + { + x.m_Hist.m_Height = m_Cursor.m_ID.m_Height; + bc.m_TxPool.SetState(x, TxPool::Fluff::State::Outdated); // isn't available in this context + } } LOG_INFO() << "GenerateNewBlock: size of block = " << ssc.m_Counter.m_Value << "; amount of tx = " << nTxNum; diff --git a/node/txpool.cpp b/node/txpool.cpp index 12483b3b7..e496e3dc2 100644 --- a/node/txpool.cpp +++ b/node/txpool.cpp @@ -52,7 +52,7 @@ bool TxPool::Profit::operator < (const Profit& t) const ///////////////////////////// // Fluff -TxPool::Fluff::Element* TxPool::Fluff::AddValidTx(Transaction::Ptr&& pValue, const Stats& stats, const Transaction::KeyType& key) +TxPool::Fluff::Element* TxPool::Fluff::AddValidTx(Transaction::Ptr&& pValue, const Stats& stats, const Transaction::KeyType& key, State s) { assert(pValue); @@ -60,76 +60,104 @@ TxPool::Fluff::Element* TxPool::Fluff::AddValidTx(Transaction::Ptr&& pValue, con p->m_pValue = std::move(pValue); p->m_Profit.m_Stats = stats; p->m_Tx.m_Key = key; - p->m_Outdated.m_Height = MaxHeight; - assert(!p->IsOutdated()); - InternalInsert(*p); + p->m_State = s; - EnsureSend(*p, true); + Features f0 = { false }; + SetState(*p, f0, Features::get(s)); return p; } -void TxPool::Fluff::EnsureSend(Element& x, bool b) +TxPool::Fluff::Features TxPool::Fluff::Features::get(State s) { - if (b == !!x.m_pSend) - return; // no change - - if (x.m_pSend) - { - assert(x.m_pSend && (&x == x.m_pSend->m_pThis)); - - x.m_pSend->m_pThis = nullptr; - Release(*x.m_pSend); - x.m_pSend = nullptr; - } - else + Features ret = { false }; + switch (s) { - x.m_pSend = new Element::Send; - x.m_pSend->m_Refs = 1; - x.m_pSend->m_pThis = &x; - m_SendQueue.push_back(*x.m_pSend); + case State::Fluffed: + ret.m_TxSet = true; + ret.m_Send = true; + break; + + case State::PreFluffed: + ret.m_TxSet = true; + ret.m_WaitFluff = true; + break; + + case State::Outdated: + ret.m_Outdated = true; + break; + + default: // suppress warning + break; } + + return ret; } -void TxPool::Fluff::SetOutdated(Element& x, Height h) +void TxPool::Fluff::SetState(Element& x, State s) { - InternalErase(x); - x.m_Outdated.m_Height = h; - InternalInsert(x); + auto f0 = Features::get(x.m_State); + auto f = Features::get(s); - EnsureSend(x, !x.IsOutdated()); + x.m_State = s; + SetState(x, f0, f); } -void TxPool::Fluff::InternalInsert(Element& x) +void TxPool::Fluff::SetState(Element& x, Features f0, Features f) { - if (x.IsOutdated()) + if (f.m_Send != f0.m_Send) { - assert(m_lstOutdated.empty() || (m_lstOutdated.back().m_Height <= x.m_Outdated.m_Height)); // order must be preserved - m_lstOutdated.push_back(x.m_Outdated); + if (f.m_Send) + { + assert(!x.m_pSend); + + x.m_pSend = new Element::Send; + x.m_pSend->m_Refs = 1; + x.m_pSend->m_pThis = &x; + m_SendQueue.push_back(*x.m_pSend); + } + else + { + assert(x.m_pSend && (&x == x.m_pSend->m_pThis)); + + x.m_pSend->m_pThis = nullptr; + Release(*x.m_pSend); + x.m_pSend = nullptr; + } } - else + + if (f.m_TxSet != f0.m_TxSet) { - m_setTxs.insert(x.m_Tx); - m_setProfit.insert(x.m_Profit); + if (f.m_TxSet) + m_setProfit.insert(x.m_Profit); + else + m_setProfit.erase(ProfitSet::s_iterator_to(x.m_Profit)); } + + SetStateHist(x, m_lstOutdated, f0.m_Outdated, f.m_Outdated); + SetStateHist(x, m_lstWaitFluff, f0.m_WaitFluff, f.m_WaitFluff); } -void TxPool::Fluff::InternalErase(Element& x) +void TxPool::Fluff::SetStateHist(Element& x, HistList& lst, bool b0, bool b) { - if (x.IsOutdated()) - m_lstOutdated.erase(OutdatedList::s_iterator_to(x.m_Outdated)); - else + if (b != b0) { - m_setTxs.erase(TxSet::s_iterator_to(x.m_Tx)); - m_setProfit.erase(ProfitSet::s_iterator_to(x.m_Profit)); + if (b) + { + assert(lst.empty() || (lst.back().m_Height <= x.m_Hist.m_Height)); // order must be preserved + lst.push_back(x.m_Hist); + } + else + lst.erase(HistList::s_iterator_to(x.m_Hist)); } } void TxPool::Fluff::Delete(Element& x) { - InternalErase(x); - EnsureSend(x, false); + auto f0 = Features::get(x.m_State); + Features f = { false }; + SetState(x, f0, f); delete &x; } diff --git a/node/txpool.h b/node/txpool.h index 417ce28a7..b2deb8fa8 100644 --- a/node/txpool.h +++ b/node/txpool.h @@ -44,9 +44,16 @@ struct TxPool struct Fluff { + enum State { + PreFluffed, + Fluffed, + Outdated, + }; + struct Element { Transaction::Ptr m_pValue; + State m_State; struct Tx :public intrusive::set_base_hook @@ -60,14 +67,12 @@ struct TxPool IMPLEMENT_GET_PARENT_OBJ(Element, m_Profit) } m_Profit; - struct Outdated + struct Hist :public boost::intrusive::list_base_hook<> { Height m_Height; - - bool operator < (const Outdated& t) const { return m_Height < t.m_Height; } - IMPLEMENT_GET_PARENT_OBJ(Element, m_Outdated) - } m_Outdated; + IMPLEMENT_GET_PARENT_OBJ(Element, m_Hist) + } m_Hist; struct Send :public boost::intrusive::list_base_hook<> @@ -76,22 +81,21 @@ struct TxPool uint32_t m_Refs = 0; }; Send* m_pSend = nullptr; - - bool IsOutdated() const { return MaxHeight != m_Outdated.m_Height; } }; typedef boost::intrusive::multiset TxSet; typedef boost::intrusive::multiset ProfitSet; - typedef boost::intrusive::list OutdatedList; + typedef boost::intrusive::list HistList; typedef boost::intrusive::list SendQueue; TxSet m_setTxs; ProfitSet m_setProfit; - OutdatedList m_lstOutdated; SendQueue m_SendQueue; + HistList m_lstOutdated; + HistList m_lstWaitFluff; - Element* AddValidTx(Transaction::Ptr&&, const Stats&, const Transaction::KeyType&); - void SetOutdated(Element&, Height); + Element* AddValidTx(Transaction::Ptr&&, const Stats&, const Transaction::KeyType&, State); + void SetState(Element&, State); void Delete(Element&); void Release(Element::Send&); void Clear(); @@ -99,9 +103,20 @@ struct TxPool ~Fluff() { Clear(); } private: - void InternalInsert(Element&); - void InternalErase(Element&); - void EnsureSend(Element&, bool); + + struct Features + { + bool m_Send; + bool m_TxSet; + bool m_WaitFluff; + bool m_Outdated; + + static Features get(State); + }; + + void SetState(Element&, Features f0, Features f); + static void SetStateHist(Element&, HistList&, bool b0, bool b); + }; struct Stem diff --git a/node/unittests/node_test.cpp b/node/unittests/node_test.cpp index e60760858..fe0dec3e2 100644 --- a/node/unittests/node_test.cpp +++ b/node/unittests/node_test.cpp @@ -1074,7 +1074,7 @@ namespace beam TxPool::Stats stats; stats.From(*pTx, ctx, 0, 0); - np.m_TxPool.AddValidTx(std::move(pTx), stats, key); + np.m_TxPool.AddValidTx(std::move(pTx), stats, key, TxPool::Fluff::State::Fluffed); } NodeProcessor::BlockContext bc(np.m_TxPool, 0, *np.m_Wallet.m_pKdf, *np.m_Wallet.m_pKdf); From 50c90f1d209f2d266a508fd87a2ee9dc8e897a17 Mon Sep 17 00:00:00 2001 From: valdok Date: Mon, 20 Dec 2021 04:13:55 +0200 Subject: [PATCH 06/12] m --- node/txpool.cpp | 3 ++- node/txpool.h | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/node/txpool.cpp b/node/txpool.cpp index e496e3dc2..0b9ed017a 100644 --- a/node/txpool.cpp +++ b/node/txpool.cpp @@ -52,7 +52,7 @@ bool TxPool::Profit::operator < (const Profit& t) const ///////////////////////////// // Fluff -TxPool::Fluff::Element* TxPool::Fluff::AddValidTx(Transaction::Ptr&& pValue, const Stats& stats, const Transaction::KeyType& key, State s) +TxPool::Fluff::Element* TxPool::Fluff::AddValidTx(Transaction::Ptr&& pValue, const Stats& stats, const Transaction::KeyType& key, State s, Height hLst /* = 0 */) { assert(pValue); @@ -60,6 +60,7 @@ TxPool::Fluff::Element* TxPool::Fluff::AddValidTx(Transaction::Ptr&& pValue, con p->m_pValue = std::move(pValue); p->m_Profit.m_Stats = stats; p->m_Tx.m_Key = key; + p->m_Hist.m_Height = hLst; p->m_State = s; diff --git a/node/txpool.h b/node/txpool.h index b2deb8fa8..c0a060cce 100644 --- a/node/txpool.h +++ b/node/txpool.h @@ -94,7 +94,7 @@ struct TxPool HistList m_lstOutdated; HistList m_lstWaitFluff; - Element* AddValidTx(Transaction::Ptr&&, const Stats&, const Transaction::KeyType&, State); + Element* AddValidTx(Transaction::Ptr&&, const Stats&, const Transaction::KeyType&, State, Height hLst = 0); void SetState(Element&, State); void Delete(Element&); void Release(Element::Send&); From 0e7cedd67229d5a8f3e9590b3a53295d731be9e9 Mon Sep 17 00:00:00 2001 From: valdok Date: Mon, 20 Dec 2021 04:27:49 +0200 Subject: [PATCH 07/12] Node: TxPool improvement, WIP (5) --- node/node.cpp | 317 ++++++++++++++++++++++-------------------------- node/node.h | 6 +- node/txpool.cpp | 45 +++---- node/txpool.h | 14 +-- 4 files changed, 163 insertions(+), 219 deletions(-) diff --git a/node/node.cpp b/node/node.cpp index c16a8759e..55ae7439a 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -587,15 +587,12 @@ void Node::DeleteOutdated() for (TxPool::Stem::TimeSet::iterator it = m_Dandelion.m_setTime.begin(); m_Dandelion.m_setTime.end() != it; ) { TxPool::Stem::Element& x = (it++)->get_ParentObj(); - assert(MaxHeight == x.m_Confirm.m_Height); uint32_t nBvmCharge = 0; uint8_t nStatus = m_Processor.ValidateTxContextEx(*x.m_pValue, x.m_Profit.m_Stats.m_Hr, true, nBvmCharge, nullptr, nullptr); if (proto::TxStatus::Ok != nStatus) { - bool bDone = x.m_Profit.m_Stats.m_Hr.IsInRange(m_Processor.m_Cursor.m_ID.m_Height + 1); - LogTxStem(*x.m_pValue, bDone ? "confirmed without fluff" : "outdated"); - + LogTxStem(*x.m_pValue, "out-1"); m_Dandelion.Delete(x); } } @@ -604,28 +601,27 @@ void Node::DeleteOutdated() { h = m_Processor.m_Cursor.m_ID.m_Height - m_Cfg.m_Dandelion.m_dhStemConfirm; - while (!m_Dandelion.m_lstConfirm.empty()) + while (!m_TxPool.m_lstWaitFluff.empty()) { - auto& c = m_Dandelion.m_lstConfirm.front(); + auto& c = m_TxPool.m_lstWaitFluff.front(); if (c.m_Height >= h) break; auto& x = c.get_ParentObj(); - assert(!x.m_Time.m_Value); uint32_t nBvmCharge = 0; - uint8_t nStatus = m_Processor.ValidateTxContextEx(*x.m_pValue, x.m_Stats.m_Hr, true, nBvmCharge, nullptr, nullptr); + uint8_t nStatus = m_Processor.ValidateTxContextEx(*x.m_pValue, x.m_Profit.m_Stats.m_Hr, true, nBvmCharge, nullptr, nullptr); if (proto::TxStatus::Ok == nStatus) { - LogTxStem(*x.m_pValue, "Not confirmed, fluffing"); - OnTransactionFluff(std::move(x.m_pValue), nullptr, nullptr, &x); + LogTxStem(*x.m_pValue, "auto-fluffing"); + + m_TxPool.SetState(x, TxPool::Fluff::State::Fluffed); + OnTransactionFluff(x, nullptr); } else { - bool bDone = x.m_Profit.m_Stats.m_Hr.IsInRange(m_Processor.m_Cursor.m_ID.m_Height + 1); - LogTxStem(*x.m_pValue, bDone ? "confirm done" : "outdated"); - - m_Dandelion.Delete(x); + LogTxStem(*x.m_pValue, "out-2"); + m_TxPool.Delete(x); } } } @@ -783,13 +779,14 @@ void Node::Processor::OnRolledBack() txps.Delete(x); } - for (TxPool::Stem::ConfirmList::iterator it = txps.m_lstConfirm.begin(); txps.m_lstConfirm.end() != it; ) + while (!txp.m_lstWaitFluff.empty()) { - TxPool::Stem::Element& x = (it++)->get_ParentObj(); - if (!IsShieldedInPool(*x.m_pValue)) - txps.Delete(x); - } + TxPool::Fluff::Element& x = txp.m_lstWaitFluff.back().get_ParentObj(); + if (x.m_Hist.m_Height <= m_Cursor.m_ID.m_Height) + break; + txp.Delete(x); // never mind + } get_ParentObj().m_TxDependent.Clear(); @@ -2420,69 +2417,74 @@ uint8_t Node::OnTransactionStem(Transaction::Ptr&& ptx, std::ostream* pExtraInfo return proto::TxStatus::LimitExceeded; } + Transaction::KeyType keyTx; + ptx->get_Key(keyTx); + + auto itF = m_TxPool.m_setTxs.find(keyTx, TxPool::Fluff::Element::Tx::Comparator()); + TxPool::Fluff::Element* pF = (m_TxPool.m_setTxs.end() == itF) ? nullptr : &itF->get_ParentObj(); + TxPool::Stats stats; bool bTested = false; - TxPool::Stem::Element* pDup = nullptr; - // find match by kernels - for (size_t i = 0; i < ptx->m_vKernels.size(); i++) + if (pF) { - const TxKernel& krn = *ptx->m_vKernels[i]; - - TxPool::Stem::Element::Kernel key; - key.m_pKrn = &krn; + if (TxPool::Fluff::State::Fluffed == pF->m_State) + { + LogTxStem(*ptx, "Already fluffed"); + return proto::TxStatus::Ok; + } - TxPool::Stem::KrnSet::iterator it = m_Dandelion.m_setKrns.find(key); - if (m_Dandelion.m_setKrns.end() == it) - continue; + if (pF->m_Hist.m_Height == m_Processor.m_Cursor.m_Full.m_Height) + { + stats = pF->m_Profit.m_Stats; + bTested = true; + } - TxPool::Stem::Element* pElem = it->m_pThis; - bool bElemCovers = true, bNewCovers = true; - pElem->m_pValue->get_Reader().Compare(std::move(ptx->get_Reader()), bElemCovers, bNewCovers); + ptx = pF->m_pValue; // prefer it, to avoid ambiguity + } - if (!bNewCovers) - { - LogTxStem(*ptx, "obscured by another tx. Deleting"); - LogTxStem(*pElem->m_pValue, "Remaining"); - return proto::TxStatus::Obscured; // the new tx is reduced, drop it - } + LogTxStem(*ptx, pF ? "Already received" : "New"); - if (bElemCovers) + if (!bTested) + { + uint8_t nCode = ValidateTx(stats, *ptx, pExtraInfo); + if (proto::TxStatus::Ok != nCode) { - pDup = pElem; // exact match - - if (pDup->m_bAggregating) - { - LogTxStem(*ptx, "Received despite being-aggregated"); - return proto::TxStatus::Ok; // it shouldn't have been received, but nevermind, just ignore - } - - LogTxStem(*ptx, "Already received"); - break; + LogTxStem(*ptx, "invalid"); + return nCode; } + } - if (!bTested) - { - uint8_t nCode = ValidateTx(stats, *ptx, pExtraInfo); - if (proto::TxStatus::Ok != nCode) - return nCode; + bool bDontAggregate = true; + if (!pF) + { + bDontAggregate = + (ptx->m_vOutputs.size() >= m_Cfg.m_Dandelion.m_OutputsMax) || // already big enough + s.m_KernelsNonStd || // contains non-std elements + !m_Keys.m_pMiner; // can't manage decoys - bTested = true; - } + // add it to wait-fluff list BEFORE we modify it + Transaction::Ptr pTxOrig; + if (bDontAggregate) + pTxOrig = ptx; + else + { + // clone the tx, since we may modify it in the fure + pTxOrig = std::make_shared(); + TxVectors::Writer wtx(*pTxOrig, *pTxOrig); - LogTxStem(*pElem->m_pValue, "obscured by newer tx"); - OnTransactionWaitingConfirm(*pElem); + wtx.Dump(ptx->get_Reader()); + pTxOrig->m_Offset = ptx->m_Offset; + } + + pF = m_TxPool.AddValidTx(std::move(pTxOrig), stats, keyTx, TxPool::Fluff::State::PreFluffed, m_Processor.m_Cursor.m_Full.m_Height); + m_Wtx.Delete(keyTx); } - if (!pDup) + if (bDontAggregate) + OnTransactionAggregated(std::move(ptx), stats); + else { - if (!bTested) - { - uint8_t nCode = ValidateTx(stats, *ptx, pExtraInfo); - if (proto::TxStatus::Ok != nCode) - return nCode; - } - AddDummyInputs(*ptx, stats); auto pGuard = std::make_unique(); @@ -2492,25 +2494,10 @@ uint8_t Node::OnTransactionStem(Transaction::Ptr&& ptx, std::ostream* pExtraInfo pGuard->m_pValue.swap(ptx); m_Dandelion.InsertKrn(*pGuard); + m_Dandelion.InsertAggr(*pGuard); + auto* pElem = pGuard.release(); - pDup = pGuard.release(); - - LogTxStem(*pDup->m_pValue, "New"); - } - - assert(!pDup->m_bAggregating); - - bool bDontAggregate = - (pDup->m_pValue->m_vOutputs.size() >= m_Cfg.m_Dandelion.m_OutputsMax) || // already big enough - s.m_KernelsNonStd || // contains non-std elements - !m_Keys.m_pMiner; // can't manage decoys - - if (bDontAggregate) - OnTransactionAggregated(*pDup); - else - { - m_Dandelion.InsertAggr(*pDup); - PerformAggregation(*pDup); + PerformAggregation(*pElem); } return proto::TxStatus::Ok; @@ -2542,23 +2529,33 @@ void Node::OnTransactionAggregated(TxPool::Stem::Element& x) uint32_t nRandomPeerIdx = RandomUInt32(nStemPeers); for (PeerList::iterator it = m_lstPeers.begin(); ; ++it) - if ((it->m_LoginFlags & proto::LoginFlags::SpreadingTransactions) && !nRandomPeerIdx--) - { - if (m_Cfg.m_LogTxStem) - { - LOG_INFO() << "Stem continues to " << it->m_RemoteAddr; - } + if (sel.IsValid(*it) && !nRandomPeerIdx--) + return & *it; + } + } - it->SendTx(x.m_pValue, false); - break; - } + return nullptr; +} - // set random timer - uint32_t nTimeout_ms = m_Cfg.m_Dandelion.m_TimeoutMin_ms + RandomUInt32(m_Cfg.m_Dandelion.m_TimeoutMax_ms - m_Cfg.m_Dandelion.m_TimeoutMin_ms); - m_Dandelion.SetTimer(nTimeout_ms, x); +void Node::OnTransactionAggregated(Transaction::Ptr&& pTx, const TxPool::Stats& stats) +{ + LogTxStem(*pTx, "Aggregation finished"); - return; + Peer::Selector_Stem sel; + Peer* pNext = SelectRandomPeer(sel); + if (pNext) + { + if (m_Cfg.m_LogTxStem) + { + LOG_INFO() << "Stem continues to " << pNext->m_RemoteAddr; } + + pNext->SendTx(pTx, false); + } + else + { + LogTxStem(*pTx, "Going to fluff"); + OnTransactionFluff(std::move(pTx), nullptr, nullptr, &stats); } LogTxStem(*x.m_pValue, "Going to fluff"); @@ -2615,8 +2612,11 @@ void Node::PerformAggregation(TxPool::Stem::Element& x) LogTxStem(*x.m_pValue, "Aggregated so far"); if (x.m_pValue->m_vOutputs.size() >= m_Cfg.m_Dandelion.m_OutputsMin) - OnTransactionAggregated(x); - else + { + OnTransactionAggregated(std::move(x.m_pValue), x.m_Profit.m_Stats); + m_Dandelion.Delete(x); + } + else { LogTxStem(*x.m_pValue, "Aggregation pending"); m_Dandelion.SetTimer(m_Cfg.m_Dandelion.m_AggregationTime_ms, x); @@ -2776,92 +2776,64 @@ Height Node::SampleDummySpentHeight() return h; } -void Node::OnTransactionWaitingConfirm(TxPool::Stem::Element& x) -{ - m_Dandelion.DeleteAggr(x); - m_Dandelion.DeleteTimer(x); - - if (MaxHeight == x.m_Confirm.m_Height) - { - m_Dandelion.InsertConfirm(x, m_Processor.m_Cursor.m_Full.m_Height); - LogTxStem(*x.m_pValue, "Waiting confirmation"); - } -} - -uint8_t Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, std::ostream* pExtraInfo, const PeerID* pSender, TxPool::Stem::Element* pElem) +uint8_t Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, std::ostream* pExtraInfo, const PeerID* pSender, const TxPool::Stats* pStats) { Transaction::Ptr ptx; ptx.swap(ptxArg); - TxPool::Stats stats; - if (pElem) - { - stats = pElem->m_Profit.m_Stats; - - if (MaxHeight == pElem->m_Confirm.m_Height) - { - assert(!pElem->m_pValue); - pElem->m_pValue = ptx; // save ptr only, no need to clone, assuming it won't be changing - - OnTransactionWaitingConfirm(*pElem); - } - else - // fluff from - m_Dandelion.Delete(*pElem); - } - - TxPool::Fluff::Element::Tx key; - ptx->get_Key(key.m_Key); - - TxPool::Fluff::TxSet::iterator it = m_TxPool.m_setTxs.find(key); - if (m_TxPool.m_setTxs.end() != it) - return proto::TxStatus::Ok; - - const Transaction& tx = *ptx; - - // new transaction - uint8_t nCode = pElem ? proto::TxStatus::Ok : ValidateTx(stats, tx, pExtraInfo); - LogTx(tx, nCode, key.m_Key); + Transaction::KeyType keyTx; + ptx->get_Key(keyTx); - if (proto::TxStatus::Ok != nCode) { - return nCode; // stupid compiler insists on parentheses here! - } + auto itF = m_TxPool.m_setTxs.find(keyTx, TxPool::Fluff::Element::Tx::Comparator()); + TxPool::Fluff::Element* pF = (m_TxPool.m_setTxs.end() == itF) ? nullptr : &itF->get_ParentObj(); - m_Wtx.Delete(key.m_Key); + if (pF && (TxPool::Fluff::State::Fluffed == pF->m_State)) + return proto::TxStatus::Ok; // already fluffed - if (!pElem) + TxPool::Stats stats; + if (!pStats) { - for (size_t i = 0; i < ptx->m_vKernels.size(); i++) + bool bTested = pF && (pF->m_Hist.m_Height == m_Processor.m_Cursor.m_Full.m_Height); + if (!bTested) { - TxPool::Stem::Element::Kernel keyKrn; - keyKrn.m_pKrn = ptx->m_vKernels[i].get(); + const auto& pTxToTest = pF ? pF->m_pValue : ptx; // avoid ambiguity - TxPool::Stem::KrnSet::iterator itKrn = m_Dandelion.m_setKrns.find(keyKrn); - if (m_Dandelion.m_setKrns.end() != itKrn) - OnTransactionWaitingConfirm(*itKrn->m_pThis); - } + uint8_t nCode = ValidateTx(stats, *pTxToTest, pExtraInfo); + LogTx(*pTxToTest, nCode, keyTx); + if (proto::TxStatus::Ok != nCode) + return nCode; + } } - TxPool::Fluff::Element* pNewTxElem = m_TxPool.AddValidTx(std::move(ptx), stats, key.m_Key, TxPool::Fluff::State::Fluffed); + if (!pF) + { + pF = m_TxPool.AddValidTx(std::move(ptx), (pStats ? *pStats : stats), keyTx, TxPool::Fluff::State::Fluffed); + m_Wtx.Delete(keyTx); + } - while (m_TxPool.m_setProfit.size() + m_TxPool.m_lstOutdated.size() > m_Cfg.m_MaxPoolTransactions) - { + while (m_TxPool.m_setProfit.size() + m_TxPool.m_lstOutdated.size() > m_Cfg.m_MaxPoolTransactions) + { TxPool::Fluff::Element& txDel = m_TxPool.m_lstOutdated.empty() ? m_TxPool.m_setProfit.rbegin()->get_ParentObj() : m_TxPool.m_lstOutdated.front().get_ParentObj(); - if (&txDel == pNewTxElem) - pNewTxElem = nullptr; // Anti-spam protection: in case the maximum pool capacity is reached - ensure this tx is any better BEFORE broadcasting ti + if (&txDel == pF) + pF = nullptr; // Anti-spam protection: in case the maximum pool capacity is reached - ensure this tx is any better BEFORE broadcasting ti - m_TxPool.Delete(txDel); - } + m_TxPool.Delete(txDel); + } - if (!pNewTxElem) - return nCode; // though the tx is dropped, we return status ok. + if (pF) + OnTransactionFluff(*pF, pSender); + return proto::TxStatus::Ok; +} + +void Node::OnTransactionFluff(TxPool::Fluff::Element& x, const PeerID* pSender) +{ proto::HaveTransaction msgOut; - msgOut.m_ID = key.m_Key; + msgOut.m_ID = x.m_Tx.m_Key; for (PeerList::iterator it2 = m_lstPeers.begin(); m_lstPeers.end() != it2; ++it2) { @@ -2872,13 +2844,10 @@ uint8_t Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, std::ostream* pExtra continue; peer.Send(msgOut); - peer.SetTxCursor(pNewTxElem->m_pSend); + peer.SetTxCursor(x.m_pSend); } - if (m_Miner.IsEnabled() && !m_Miner.m_pTaskToFinalize) - m_Miner.SetTimer(m_Cfg.m_Timeout.m_MiningSoftRestart_ms, false); - - return nCode; + m_Miner.SoftRestart(); } uint8_t Node::OnTransactionDependent(Transaction::Ptr&& pTx, const Merkle::Hash& hvCtx, const PeerID* pSender, bool bFluff, std::ostream* pExtraInfo) @@ -2932,13 +2901,15 @@ void Node::Dandelion::OnTimedOut(Element& x) if (x.m_bAggregating) { get_ParentObj().AddDummyOutputs(*x.m_pValue, x.m_Profit.m_Stats); - get_ParentObj().LogTxStem(*x.m_pValue, "Aggregation timed-out, dummies added"); - get_ParentObj().OnTransactionAggregated(x); - } + get_ParentObj().LogTxStem(*x.m_pValue, "Aggregation timed-out, dummies added"); + get_ParentObj().OnTransactionAggregated(std::move(x.m_pValue), x.m_Profit.m_Stats); + + get_ParentObj().m_Dandelion.Delete(x); + } else { - get_ParentObj().LogTxStem(*x.m_pValue, "Fluff timed-out. Emergency fluff"); - get_ParentObj().OnTransactionFluff(std::move(x.m_pValue), nullptr, nullptr, &x); + //get_ParentObj().LogTxStem(*x.m_pValue, "Fluff timed-out. Emergency fluff"); + //get_ParentObj().OnTransactionFluff(std::move(x.m_pValue), nullptr, nullptr, &x); } } diff --git a/node/node.h b/node/node.h index 0456b76b0..680ba74d2 100644 --- a/node/node.h +++ b/node/node.h @@ -390,10 +390,10 @@ struct Node void OnTransactionDeferred(Transaction::Ptr&&, std::unique_ptr&&, const PeerID*, bool bFluff); uint8_t OnTransactionStem(Transaction::Ptr&&, std::ostream* pExtraInfo); - uint8_t OnTransactionFluff(Transaction::Ptr&&, std::ostream* pExtraInfo, const PeerID*, Dandelion::Element*); + uint8_t OnTransactionFluff(Transaction::Ptr&&, std::ostream* pExtraInfo, const PeerID*, const TxPool::Stats*); + void OnTransactionFluff(TxPool::Fluff::Element&, const PeerID*); uint8_t OnTransactionDependent(Transaction::Ptr&& pTx, const Merkle::Hash& hvCtx, const PeerID* pSender, bool bFluff, std::ostream* pExtraInfo); - void OnTransactionAggregated(Dandelion::Element&); - void OnTransactionWaitingConfirm(TxPool::Stem::Element&); + void OnTransactionAggregated(Transaction::Ptr&&, const TxPool::Stats&); void PerformAggregation(Dandelion::Element&); void AddDummyInputs(Transaction&, TxPool::Stats&); bool AddDummyInputRaw(Transaction& tx, const CoinID&); diff --git a/node/txpool.cpp b/node/txpool.cpp index 0b9ed017a..fe83c406e 100644 --- a/node/txpool.cpp +++ b/node/txpool.cpp @@ -136,24 +136,27 @@ void TxPool::Fluff::SetState(Element& x, Features f0, Features f) m_setProfit.erase(ProfitSet::s_iterator_to(x.m_Profit)); } - SetStateHist(x, m_lstOutdated, f0.m_Outdated, f.m_Outdated); - SetStateHist(x, m_lstWaitFluff, f0.m_WaitFluff, f.m_WaitFluff); + SetStateHistOut(x, m_lstOutdated, f0.m_Outdated, f.m_Outdated); + SetStateHistOut(x, m_lstWaitFluff, f0.m_WaitFluff, f.m_WaitFluff); + SetStateHistIn(x, m_lstOutdated, f0.m_Outdated, f.m_Outdated); + SetStateHistIn(x, m_lstWaitFluff, f0.m_WaitFluff, f.m_WaitFluff); } -void TxPool::Fluff::SetStateHist(Element& x, HistList& lst, bool b0, bool b) +void TxPool::Fluff::SetStateHistIn(Element& x, HistList& lst, bool b0, bool b) { - if (b != b0) + if (b && !b0) { - if (b) - { - assert(lst.empty() || (lst.back().m_Height <= x.m_Hist.m_Height)); // order must be preserved - lst.push_back(x.m_Hist); - } - else - lst.erase(HistList::s_iterator_to(x.m_Hist)); + assert(lst.empty() || (lst.back().m_Height <= x.m_Hist.m_Height)); // order must be preserved + lst.push_back(x.m_Hist); } } +void TxPool::Fluff::SetStateHistOut(Element& x, HistList& lst, bool b0, bool b) +{ + if (b0 && !b) + lst.erase(HistList::s_iterator_to(x.m_Hist)); +} + void TxPool::Fluff::Delete(Element& x) { auto f0 = Features::get(x.m_State); @@ -255,7 +258,6 @@ void TxPool::Stem::DeleteRaw(Element& x) DeleteTimer(x); DeleteAggr(x); DeleteKrn(x); - DeleteConfirm(x); delete &x; } @@ -285,25 +287,6 @@ void TxPool::Stem::DeleteAggr(Element& x) } } -void TxPool::Stem::InsertConfirm(Element& x, Height h) -{ - DeleteConfirm(x); - if (MaxHeight != h) - { - x.m_Confirm.m_Height = h; - m_lstConfirm.push_back(x.m_Confirm); - } -} - -void TxPool::Stem::DeleteConfirm(Element& x) -{ - if (MaxHeight != x.m_Confirm.m_Height) - { - m_lstConfirm.erase(ConfirmList::s_iterator_to(x.m_Confirm)); - x.m_Confirm.m_Height = MaxHeight; - } -} - void TxPool::Stem::DeleteTimer(Element& x) { if (x.m_Time.m_Value) diff --git a/node/txpool.h b/node/txpool.h index c0a060cce..d67b018bc 100644 --- a/node/txpool.h +++ b/node/txpool.h @@ -115,7 +115,8 @@ struct TxPool }; void SetState(Element&, Features f0, Features f); - static void SetStateHist(Element&, HistList&, bool b0, bool b); + static void SetStateHistIn(Element&, HistList&, bool b0, bool b); + static void SetStateHistOut(Element&, HistList&, bool b0, bool b); }; @@ -151,13 +152,6 @@ struct TxPool bool operator < (const Kernel& t) const { return m_pKrn->m_Internal.m_ID < t.m_pKrn->m_Internal.m_ID; } }; - struct Confirm - :public boost::intrusive::list_base_hook<> - { - Height m_Height = MaxHeight; - IMPLEMENT_GET_PARENT_OBJ(Element, m_Confirm) - } m_Confirm; - Stats m_Stats; std::vector m_vKrn; @@ -166,12 +160,10 @@ struct TxPool typedef boost::intrusive::multiset KrnSet; typedef boost::intrusive::multiset TimeSet; typedef boost::intrusive::multiset ProfitSet; - typedef boost::intrusive::list ConfirmList; KrnSet m_setKrns; TimeSet m_setTime; ProfitSet m_setProfit; - ConfirmList m_lstConfirm; void Delete(Element&); void Clear(); @@ -180,8 +172,6 @@ struct TxPool void InsertAggr(Element&); void DeleteAggr(Element&); void DeleteTimer(Element&); - void InsertConfirm(Element&, Height); - void DeleteConfirm(Element&); bool TryMerge(Element& trg, Element& src); From 53ce5fe4bfd89ec68c3b6549f12cae32866f028d Mon Sep 17 00:00:00 2001 From: valdok Date: Mon, 20 Dec 2021 04:41:14 +0200 Subject: [PATCH 08/12] Node: TxPool improvement, WIP (6) --- node/node.cpp | 20 ++++---------------- node/node.h | 2 -- node/txpool.cpp | 42 ++++-------------------------------------- node/txpool.h | 16 ---------------- 4 files changed, 8 insertions(+), 72 deletions(-) diff --git a/node/node.cpp b/node/node.cpp index 55ae7439a..14b760343 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -2488,12 +2488,10 @@ uint8_t Node::OnTransactionStem(Transaction::Ptr&& ptx, std::ostream* pExtraInfo AddDummyInputs(*ptx, stats); auto pGuard = std::make_unique(); - pGuard->m_bAggregating = false; pGuard->m_Time.m_Value = 0; pGuard->m_Profit.m_Stats = stats; pGuard->m_pValue.swap(ptx); - m_Dandelion.InsertKrn(*pGuard); m_Dandelion.InsertAggr(*pGuard); auto* pElem = pGuard.release(); @@ -2564,8 +2562,6 @@ void Node::OnTransactionAggregated(Transaction::Ptr&& pTx, const TxPool::Stats& void Node::PerformAggregation(TxPool::Stem::Element& x) { - assert(x.m_bAggregating); - bool bModified = false; // Aggregation policiy: first select those with worse profit, than those with better TxPool::Stem::ProfitSet::iterator it = TxPool::Stem::ProfitSet::s_iterator_to(x.m_Profit); @@ -2898,19 +2894,11 @@ uint8_t Node::OnTransactionDependent(Transaction::Ptr&& pTx, const Merkle::Hash& void Node::Dandelion::OnTimedOut(Element& x) { - if (x.m_bAggregating) - { - get_ParentObj().AddDummyOutputs(*x.m_pValue, x.m_Profit.m_Stats); - get_ParentObj().LogTxStem(*x.m_pValue, "Aggregation timed-out, dummies added"); - get_ParentObj().OnTransactionAggregated(std::move(x.m_pValue), x.m_Profit.m_Stats); + get_ParentObj().AddDummyOutputs(*x.m_pValue, x.m_Profit.m_Stats); + get_ParentObj().LogTxStem(*x.m_pValue, "Aggregation timed-out, dummies added"); + get_ParentObj().OnTransactionAggregated(std::move(x.m_pValue), x.m_Profit.m_Stats); - get_ParentObj().m_Dandelion.Delete(x); - } - else - { - //get_ParentObj().LogTxStem(*x.m_pValue, "Fluff timed-out. Emergency fluff"); - //get_ParentObj().OnTransactionFluff(std::move(x.m_pValue), nullptr, nullptr, &x); - } + get_ParentObj().m_Dandelion.Delete(x); } bool Node::Dandelion::ValidateTxContext(const Transaction& tx, const HeightRange& hr, const AmountBig::Type& fees, Amount& feeReserve) diff --git a/node/node.h b/node/node.h index 680ba74d2..4fd7fa565 100644 --- a/node/node.h +++ b/node/node.h @@ -134,8 +134,6 @@ struct Node struct Dandelion { uint16_t m_FluffProbability = 0x1999; // normalized wrt 16 bit. Equals to 0.1 - uint32_t m_TimeoutMin_ms = 20000; - uint32_t m_TimeoutMax_ms = 50000; uint32_t m_dhStemConfirm = 5; // if stem tx is not mined within this number of blocks - it's auto-fluffed uint32_t m_AggregationTime_ms = 10000; diff --git a/node/txpool.cpp b/node/txpool.cpp index fe83c406e..00ca17cde 100644 --- a/node/txpool.cpp +++ b/node/txpool.cpp @@ -190,8 +190,6 @@ void TxPool::Fluff::Clear() // Stem bool TxPool::Stem::TryMerge(Element& trg, Element& src) { - assert(trg.m_bAggregating && src.m_bAggregating); - HeightRange hr = trg.m_Profit.m_Stats.m_Hr; hr.Intersect(src.m_Profit.m_Stats.m_Hr); if (hr.IsEmpty()) @@ -231,8 +229,6 @@ bool TxPool::Stem::TryMerge(Element& trg, Element& src) trg.m_pValue->m_Offset = txNew.m_Offset; Delete(src); - DeleteKrn(trg); - InsertKrn(trg); return true; } @@ -257,34 +253,18 @@ void TxPool::Stem::DeleteRaw(Element& x) { DeleteTimer(x); DeleteAggr(x); - DeleteKrn(x); delete &x; } -void TxPool::Stem::DeleteKrn(Element& x) -{ - for (size_t i = 0; i < x.m_vKrn.size(); i++) - m_setKrns.erase(KrnSet::s_iterator_to(x.m_vKrn[i])); - x.m_vKrn.clear(); -} - void TxPool::Stem::InsertAggr(Element& x) { - if (!x.m_bAggregating) - { - x.m_bAggregating = true; - m_setProfit.insert(x.m_Profit); - } + m_setProfit.insert(x.m_Profit); } void TxPool::Stem::DeleteAggr(Element& x) { - if (x.m_bAggregating) - { - m_setProfit.erase(ProfitSet::s_iterator_to(x.m_Profit)); - x.m_bAggregating = false; - } + m_setProfit.erase(ProfitSet::s_iterator_to(x.m_Profit)); } void TxPool::Stem::DeleteTimer(Element& x) @@ -296,24 +276,10 @@ void TxPool::Stem::DeleteTimer(Element& x) } } -void TxPool::Stem::InsertKrn(Element& x) -{ - const Transaction& tx = *x.m_pValue; - x.m_vKrn.resize(tx.m_vKernels.size()); - - for (size_t i = 0; i < x.m_vKrn.size(); i++) - { - Element::Kernel& n = x.m_vKrn[i]; - n.m_pKrn = tx.m_vKernels[i].get(); - m_setKrns.insert(n); - n.m_pThis = &x; - } -} - void TxPool::Stem::Clear() { - while (!m_setKrns.empty()) - DeleteRaw(*m_setKrns.begin()->m_pThis); + while (!m_setProfit.empty()) + DeleteRaw(m_setProfit.begin()->get_ParentObj()); KillTimer(); } diff --git a/node/txpool.h b/node/txpool.h index d67b018bc..1858f9808 100644 --- a/node/txpool.h +++ b/node/txpool.h @@ -122,11 +122,9 @@ struct TxPool struct Stem { - struct Element { Transaction::Ptr m_pValue; - bool m_bAggregating; // if set - the tx isn't broadcasted yet, and inserted in the 'Profit' set struct Time :public boost::intrusive::set_base_hook<> @@ -144,31 +142,17 @@ struct TxPool IMPLEMENT_GET_PARENT_OBJ(Element, m_Profit) } m_Profit; - struct Kernel - :public boost::intrusive::set_base_hook<> - { - Element* m_pThis; - const TxKernel* m_pKrn; - bool operator < (const Kernel& t) const { return m_pKrn->m_Internal.m_ID < t.m_pKrn->m_Internal.m_ID; } - }; - Stats m_Stats; - - std::vector m_vKrn; }; - typedef boost::intrusive::multiset KrnSet; typedef boost::intrusive::multiset TimeSet; typedef boost::intrusive::multiset ProfitSet; - KrnSet m_setKrns; TimeSet m_setTime; ProfitSet m_setProfit; void Delete(Element&); void Clear(); - void InsertKrn(Element&); - void DeleteKrn(Element&); void InsertAggr(Element&); void DeleteAggr(Element&); void DeleteTimer(Element&); From 93abc941d1e1091f580f268c2672aef72d898f2f Mon Sep 17 00:00:00 2001 From: valdok Date: Mon, 20 Dec 2021 11:33:33 +0200 Subject: [PATCH 09/12] Node: TxPool improvement, WIP (7) --- node/txpool.cpp | 18 +++++++++++------- node/txpool.h | 2 +- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/node/txpool.cpp b/node/txpool.cpp index 00ca17cde..90d3ddbdc 100644 --- a/node/txpool.cpp +++ b/node/txpool.cpp @@ -77,7 +77,7 @@ TxPool::Fluff::Features TxPool::Fluff::Features::get(State s) { case State::Fluffed: ret.m_TxSet = true; - ret.m_Send = true; + ret.m_SendAndProfit = true; break; case State::PreFluffed: @@ -107,9 +107,9 @@ void TxPool::Fluff::SetState(Element& x, State s) void TxPool::Fluff::SetState(Element& x, Features f0, Features f) { - if (f.m_Send != f0.m_Send) + if (f.m_SendAndProfit != f0.m_SendAndProfit) { - if (f.m_Send) + if (f.m_SendAndProfit) { assert(!x.m_pSend); @@ -117,6 +117,8 @@ void TxPool::Fluff::SetState(Element& x, Features f0, Features f) x.m_pSend->m_Refs = 1; x.m_pSend->m_pThis = &x; m_SendQueue.push_back(*x.m_pSend); + + m_setProfit.insert(x.m_Profit); } else { @@ -125,15 +127,17 @@ void TxPool::Fluff::SetState(Element& x, Features f0, Features f) x.m_pSend->m_pThis = nullptr; Release(*x.m_pSend); x.m_pSend = nullptr; + + m_setProfit.erase(ProfitSet::s_iterator_to(x.m_Profit)); } } if (f.m_TxSet != f0.m_TxSet) { if (f.m_TxSet) - m_setProfit.insert(x.m_Profit); + m_setTxs.insert(x.m_Tx); else - m_setProfit.erase(ProfitSet::s_iterator_to(x.m_Profit)); + m_setTxs.erase(TxSet::s_iterator_to(x.m_Tx)); } SetStateHistOut(x, m_lstOutdated, f0.m_Outdated, f.m_Outdated); @@ -179,8 +183,8 @@ void TxPool::Fluff::Release(Element::Send& x) void TxPool::Fluff::Clear() { - while (!m_setProfit.empty()) - Delete(m_setProfit.begin()->get_ParentObj()); + while (!m_setTxs.empty()) + Delete(m_setTxs.begin()->get_ParentObj()); while (!m_lstOutdated.empty()) Delete(m_lstOutdated.begin()->get_ParentObj()); diff --git a/node/txpool.h b/node/txpool.h index 1858f9808..b754cee9e 100644 --- a/node/txpool.h +++ b/node/txpool.h @@ -106,7 +106,7 @@ struct TxPool struct Features { - bool m_Send; + bool m_SendAndProfit; bool m_TxSet; bool m_WaitFluff; bool m_Outdated; From 736900a8461f0e028a48864d60747bd6bca27559 Mon Sep 17 00:00:00 2001 From: valdok Date: Mon, 20 Dec 2021 12:03:26 +0200 Subject: [PATCH 10/12] Node: TxPool improvement, WIP (8) --- node/node.cpp | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/node/node.cpp b/node/node.cpp index 14b760343..f991393a2 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -2828,6 +2828,8 @@ uint8_t Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, std::ostream* pExtra void Node::OnTransactionFluff(TxPool::Fluff::Element& x, const PeerID* pSender) { + m_TxPool.SetState(x, TxPool::Fluff::State::Fluffed); + proto::HaveTransaction msgOut; msgOut.m_ID = x.m_Tx.m_Key; @@ -3084,7 +3086,17 @@ void Node::Peer::OnMsg(proto::HaveTransaction&& msg) TxPool::Fluff::TxSet::iterator it = m_This.m_TxPool.m_setTxs.find(key); if (m_This.m_TxPool.m_setTxs.end() != it) - return; // already have it + { + // already have it + TxPool::Fluff::Element& x = it->get_ParentObj(); + if (TxPool::Fluff::State::Fluffed != x.m_State) + { + const PeerID* pSender = m_pInfo ? &m_pInfo->m_ID.m_Key : nullptr; + m_This.OnTransactionFluff(x, pSender); + } + + return; + } if (!m_This.m_Wtx.Add(key.m_Key)) return; // already waiting for it From 7215bb258d033247aa490c05025fc481e69849d2 Mon Sep 17 00:00:00 2001 From: valdok Date: Mon, 20 Dec 2021 14:04:22 +0200 Subject: [PATCH 11/12] Node: TxPool improvement: tuning and spam protection optimization --- node/node.cpp | 41 ++++++++++++++++++++++++++++++++--------- node/node.h | 8 ++++++-- 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/node/node.cpp b/node/node.cpp index f991393a2..fb26179da 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -640,6 +640,7 @@ void Node::Processor::OnNewState() if (IsFastSync()) return; + get_ParentObj().m_TxReject.clear(); get_ParentObj().DeleteOutdated(); // Better to delete all irrelevant txs explicitly, even if the node is supposed to mine // because in practice mining could be OFF (for instance, if miner key isn't defined, and owner wallet is offline). get_ParentObj().m_TxDependent.Clear(); @@ -2234,8 +2235,15 @@ uint8_t Node::OnTransaction(Transaction::Ptr&& pTx, std::unique_ptrsecond; + } + Transaction::Context::Params pars; Transaction::Context ctx(pars); uint32_t nBvmCharge = 0; @@ -2245,11 +2253,18 @@ uint8_t Node::ValidateTx(TxPool::Stats& stats, const Transaction& tx, std::ostre if (proto::TxStatus::Ok == nRet) { if (AmountBig::get_Hi(ctx.m_Stats.m_Fee)) - return proto::TxStatus::LowFee; // actually it's ridiculously-high fee - - auto nSizeCorrection = (uint32_t) (((uint64_t) nBvmCharge) * Rules::get().MaxBodySize / bvm2::Limits::BlockCharge); + nRet = proto::TxStatus::LowFee; // actually it's ridiculously-high fee + else + { + auto nSizeCorrection = (uint32_t)(((uint64_t)nBvmCharge) * Rules::get().MaxBodySize / bvm2::Limits::BlockCharge); + stats.From(tx, ctx, feeReserve, nSizeCorrection); + } + } - stats.From(tx, ctx, feeReserve, nSizeCorrection); + if (proto::TxStatus::Ok != nRet) + { + m_TxReject[keyTx] = nRet; + m_Wtx.Delete(keyTx); } return nRet; @@ -2447,10 +2462,12 @@ uint8_t Node::OnTransactionStem(Transaction::Ptr&& ptx, std::ostream* pExtraInfo if (!bTested) { - uint8_t nCode = ValidateTx(stats, *ptx, pExtraInfo); + bool bAlreadyRejected = false; + uint8_t nCode = ValidateTx(stats, *ptx, keyTx, pExtraInfo, bAlreadyRejected); if (proto::TxStatus::Ok != nCode) { - LogTxStem(*ptx, "invalid"); + if (!bAlreadyRejected) + LogTxStem(*ptx, "invalid"); return nCode; } } @@ -2794,8 +2811,11 @@ uint8_t Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, std::ostream* pExtra { const auto& pTxToTest = pF ? pF->m_pValue : ptx; // avoid ambiguity - uint8_t nCode = ValidateTx(stats, *pTxToTest, pExtraInfo); - LogTx(*pTxToTest, nCode, keyTx); + bool bAlreadyRejected = false; + uint8_t nCode = ValidateTx(stats, *pTxToTest, keyTx, pExtraInfo, bAlreadyRejected); + + if (!bAlreadyRejected) + LogTx(*pTxToTest, nCode, keyTx); if (proto::TxStatus::Ok != nCode) return nCode; @@ -3098,6 +3118,9 @@ void Node::Peer::OnMsg(proto::HaveTransaction&& msg) return; } + if (m_This.m_TxReject.end() != m_This.m_TxReject.find(msg.m_ID)) + return; + if (!m_This.m_Wtx.Add(key.m_Key)) return; // already waiting for it diff --git a/node/node.h b/node/node.h index 4fd7fa565..0a0b25298 100644 --- a/node/node.h +++ b/node/node.h @@ -134,7 +134,7 @@ struct Node struct Dandelion { uint16_t m_FluffProbability = 0x1999; // normalized wrt 16 bit. Equals to 0.1 - uint32_t m_dhStemConfirm = 5; // if stem tx is not mined within this number of blocks - it's auto-fluffed + uint32_t m_dhStemConfirm = 2; // if stem tx is not mined within this number of blocks (+1) - it's auto-fluffed uint32_t m_AggregationTime_ms = 10000; uint32_t m_OutputsMin = 5; // must be aggregated. @@ -216,6 +216,10 @@ struct Node // for step-by-step tests void GenerateFakeBlocks(uint32_t n); + TxPool::Fluff m_TxPool; + TxPool::Dependent m_TxDependent; + std::map m_TxReject; // spam + private: struct Processor @@ -400,7 +404,7 @@ struct Node Height SampleDummySpentHeight(); void DeleteOutdated(); - uint8_t ValidateTx(TxPool::Stats&, const Transaction&, std::ostream* pExtraInfo); // complete validation + uint8_t ValidateTx(TxPool::Stats&, const Transaction&, const Transaction::KeyType& keyTx, std::ostream* pExtraInfo, bool& bAlreadyRejected); // complete validation uint8_t ValidateTx2(Transaction::Context&, const Transaction&, uint32_t& nBvmCharge, Amount& feeReserve, TxPool::Dependent::Element* pParent, std::ostream* pExtraInfo); // complete validation static bool CalculateFeeReserve(const TxStats&, const HeightRange&, const AmountBig::Type&, uint32_t nBvmCharge, Amount& feeReserve); void LogTx(const Transaction&, uint8_t nStatus, const Transaction::KeyType&); From 318b98aed329fff7b3ab852e45fb9bd77ff7eb5a Mon Sep 17 00:00:00 2001 From: valdok Date: Mon, 20 Dec 2021 15:08:41 +0200 Subject: [PATCH 12/12] Manual fix of merge artefacts --- node/node.cpp | 45 ++++++++++++++++++++++++++++----------------- node/node.h | 21 ++++++++++++++++++--- 2 files changed, 46 insertions(+), 20 deletions(-) diff --git a/node/node.cpp b/node/node.cpp index fb26179da..e3a86679b 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -2518,30 +2518,24 @@ uint8_t Node::OnTransactionStem(Transaction::Ptr&& ptx, std::ostream* pExtraInfo return proto::TxStatus::Ok; } -void Node::OnTransactionAggregated(TxPool::Stem::Element& x) +Node::Peer* Node::SelectRandomPeer(Peer::ISelector& sel) { - m_Dandelion.DeleteAggr(x); - LogTxStem(*x.m_pValue, "Aggregation finished"); - // must have at least 1 peer to continue the stem phase - uint32_t nStemPeers = 0; + uint32_t nCount = 0; for (PeerList::iterator it = m_lstPeers.begin(); m_lstPeers.end() != it; ++it) - if (it->m_LoginFlags & proto::LoginFlags::SpreadingTransactions) - nStemPeers++; + if (sel.IsValid(*it)) + nCount++; - if (nStemPeers) + if (nCount) { auto thr = uintBigFrom(m_Cfg.m_Dandelion.m_FluffProbability); // Compare two bytes of threshold with random nonce if (memcmp(thr.m_pData, NextNonce().m_pData, thr.nBytes) < 0) { - // broadcast to random peer - assert(nStemPeers); - // Choose random peer index between 0 and nStemPeers - 1 - uint32_t nRandomPeerIdx = RandomUInt32(nStemPeers); + uint32_t nRandomPeerIdx = RandomUInt32(nCount); for (PeerList::iterator it = m_lstPeers.begin(); ; ++it) if (sel.IsValid(*it) && !nRandomPeerIdx--) @@ -2572,9 +2566,6 @@ void Node::OnTransactionAggregated(Transaction::Ptr&& pTx, const TxPool::Stats& LogTxStem(*pTx, "Going to fluff"); OnTransactionFluff(std::move(pTx), nullptr, nullptr, &stats); } - - LogTxStem(*x.m_pValue, "Going to fluff"); - OnTransactionFluff(std::move(x.m_pValue), nullptr, nullptr, &x); } void Node::PerformAggregation(TxPool::Stem::Element& x) @@ -4055,6 +4046,7 @@ void Node::Miner::Initialize(IExternalPOW* externalPOW) if (!cfg.m_MiningThreads && !externalPOW) return; + m_LastRestart_ms = 0; m_pEvtMined = io::AsyncEvent::create(io::Reactor::get_Current(), [this]() { OnMined(); }); if (cfg.m_MiningThreads) { @@ -4144,7 +4136,7 @@ void Node::Miner::OnRefresh(uint32_t iIdx) for (uint32_t t0_ms = GetTime_ms(); !bSolved; ) { - if (fnCancel(false)) + if (fnCancel(true)) break; std::this_thread::sleep_for(std::chrono::milliseconds(50)); @@ -4219,10 +4211,27 @@ void Node::Miner::HardAbortSafe() } if (bHadTasks) - m_External.m_pSolver->stop_current(); + m_External.m_pSolver->stop_current(); } } +void Node::Miner::SoftRestart() +{ + if (!IsEnabled() || m_pTaskToFinalize) + return; + + uint32_t nTimeout_ms = 0; + if (m_LastRestart_ms) + { + uint32_t nMin_ms = get_ParentObj().m_Cfg.m_Timeout.m_MiningSoftRestart_ms; + uint32_t nElapsed_ms = GetTimeNnz_ms() - m_LastRestart_ms; + if (nElapsed_ms < nMin_ms) + nTimeout_ms = nMin_ms - nElapsed_ms; + } + + SetTimer(nTimeout_ms, false); +} + void Node::Miner::SetTimer(uint32_t timeout_ms, bool bHard) { if (!IsEnabled()) @@ -4246,6 +4255,8 @@ void Node::Miner::OnTimer() bool Node::Miner::Restart() { + m_LastRestart_ms = GetTimeNnz_ms(); + if (!IsEnabled()) return false; // n/a diff --git a/node/node.h b/node/node.h index 0a0b25298..5f0d0b44c 100644 --- a/node/node.h +++ b/node/node.h @@ -275,9 +275,6 @@ struct Node IMPLEMENT_GET_PARENT_OBJ(Node, m_Processor) } m_Processor; - TxPool::Fluff m_TxPool; - TxPool::Dependent m_TxDependent; - struct Peer; struct Task @@ -568,6 +565,19 @@ struct Node void SendHdrs(NodeDB::StateID&, uint32_t nCount); void SendTx(Transaction::Ptr& ptx, bool bFluff); + struct ISelector { + virtual bool IsValid(Peer&)= 0; + }; + + struct Selector_Stem :public ISelector { + static bool IsValid_(Peer& p) { + return !!(proto::LoginFlags::SpreadingTransactions & p.m_LoginFlags); + } + bool IsValid(Peer& p) override { + return IsValid_(p); + } + }; + // proto::NodeConnection virtual void OnConnectedSecure() override; virtual void OnDisconnect(const DisconnectReason&) override; @@ -631,6 +641,9 @@ struct Node uint32_t RandomUInt32(uint32_t threshold); + + Peer* SelectRandomPeer(Peer::ISelector&); + ECC::Scalar::Native m_MyPrivateID; PeerID m_MyPublicID; @@ -700,6 +713,7 @@ struct Node void Initialize(IExternalPOW* externalPOW=nullptr); + void SoftRestart(); void OnRefresh(uint32_t iIdx); void OnRefreshExternal(); void OnMined(); @@ -729,6 +743,7 @@ struct Node io::Timer::Ptr m_pTimer; bool m_bTimerPending = false; + uint32_t m_LastRestart_ms; Amount m_FeesTrg = 0; void OnTimer(); void SetTimer(uint32_t timeout_ms, bool bHard);