Skip to content

Commit

Permalink
[ENG-633] Address audit comments and issues (#126)
Browse files Browse the repository at this point in the history
* Address audit sidecar comments

* [ENG-633] Address remainder of audit comments (#131)

* Address remainder of audit comments

* Address issue 4

* [ENG-633] Keep a sidecar maxBundleID per height (#132)

* Keep a sidecar maxBundleID per height

* Fix linter issues (#133)

* More linter fixes
  • Loading branch information
wllmshao committed Mar 29, 2023
1 parent 17fd8bb commit 778fb26
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 60 deletions.
4 changes: 2 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ linters:
- gosimple
- govet
- ineffassign
- misspell
# - misspell
- nakedret
- nolintlint
# - nolintlint
- prealloc
- staticcheck
# - structcheck // to be fixed by golangci-lint
Expand Down
40 changes: 15 additions & 25 deletions mempool/clist_sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
// be efficiently accessed by multiple concurrent readers.
type CListPriorityTxSidecar struct {
// Atomic integers
height int64 // the last block Update()'d to
heightForFiringAuction int64 // the height of the block to fire the auction for
txsBytes int64 // total size of sidecar, in bytes
lastBundleHeight int64 // height of last accepted bundle tx, for status rpc purposes
Expand All @@ -40,7 +39,7 @@ type CListPriorityTxSidecar struct {
// // sync.Map bundleOrder -> *SidecarTx
// }
bundles sync.Map
maxBundleID int64
maxBundleID map[int64]int64 // map of height -> max bundle ID

updateMtx tmsync.RWMutex

Expand Down Expand Up @@ -71,18 +70,18 @@ func NewCListSidecar(
) *CListPriorityTxSidecar {
sidecar := &CListPriorityTxSidecar{
txs: clist.New(),
height: height,
heightForFiringAuction: height + 1,
logger: memLogger,
metrics: mevMetrics,
maxBundleID: make(map[int64]int64),
}
sidecar.cache = NewLRUTxCache(10000)
return sidecar
}

func (sc *CListPriorityTxSidecar) PrettyPrintBundles() {
fmt.Println("-------------")
for bundleIDIter := 0; bundleIDIter <= int(sc.maxBundleID); bundleIDIter++ {
for bundleIDIter := 0; bundleIDIter <= int(sc.maxBundleID[sc.heightForFiringAuction]); bundleIDIter++ {
bundleIDIter := int64(bundleIDIter)
if bundle, ok := sc.bundles.Load(Key{sc.heightForFiringAuction, bundleIDIter}); ok {
bundle := bundle.(*Bundle)
Expand Down Expand Up @@ -134,7 +133,6 @@ func (sc *CListPriorityTxSidecar) notifyTxsAvailable() {
func (sc *CListPriorityTxSidecar) AddTx(tx types.Tx, txInfo TxInfo) error {

sc.updateMtx.RLock()
// use defer to unlock mutex because application (*local client*) might panic
defer sc.updateMtx.RUnlock()

// don't add any txs already in cache
Expand All @@ -148,9 +146,7 @@ func (sc *CListPriorityTxSidecar) AddTx(tx types.Tx, txInfo TxInfo) error {
)
// Record a new sender for a tx we've already seen.
// Note it's possible a tx is still in the cache but no longer in the mempool
// (eg. after committing a block, txs are removed from mempool but not cache),
// so we only record the sender for txs still in the mempool.
// Record a new sender for a tx we've already seen.

if e, ok := sc.txsMap.Load(tx.Key()); ok {
scTx := e.(*clist.CElement).Value.(*SidecarTx)
Expand Down Expand Up @@ -218,8 +214,6 @@ func (sc *CListPriorityTxSidecar) AddTx(tx types.Tx, txInfo TxInfo) error {
BundleID: txInfo.BundleID,
CurrSize: int64(0),
EnforcedSize: txInfo.BundleSize,
// TODO: add from gossip info?
GasWanted: int64(0),
OrderedTxsMap: &sync.Map{},
})
bundle = existingBundle.(*Bundle)
Expand All @@ -230,7 +224,7 @@ func (sc *CListPriorityTxSidecar) AddTx(tx types.Tx, txInfo TxInfo) error {
if txInfo.BundleSize != bundle.EnforcedSize {
sc.logger.Info(
"failed adding sidecarTx",
"reason", "trying to insert a tx for bundle at an order greater than the size of the bundle...",
"reason", "tx's bundle size doesn't match bundle's expected size...",
"bundle id", txInfo.BundleID,
"bundle size", txInfo.BundleSize,
"gasWanted", txInfo.GasWanted,
Expand All @@ -251,7 +245,7 @@ func (sc *CListPriorityTxSidecar) AddTx(tx types.Tx, txInfo TxInfo) error {
defer sc.bundleSizeMtx.Unlock()
// Can't add transactions if the bundle is already full
// check if the current size of this bundle is greater than the expected size for the bundle, if so skip
if bundle.CurrSize >= bundle.EnforcedSize {
if bundle.CurrSize == bundle.EnforcedSize {
sc.logger.Info(
"failed adding sidecarTx",
"reason", "bundle already full for this BundleID...",
Expand Down Expand Up @@ -296,13 +290,11 @@ func (sc *CListPriorityTxSidecar) AddTx(tx types.Tx, txInfo TxInfo) error {

// -------- UPDATE MAX BUNDLE ---------

func() {
sc.maxBundleIDMtx.Lock()
defer sc.maxBundleIDMtx.Unlock()
if txInfo.BundleID >= sc.maxBundleID {
sc.maxBundleID = txInfo.BundleID
}
}()
sc.maxBundleIDMtx.Lock()
if txInfo.BundleID > sc.maxBundleID[txInfo.DesiredHeight] {
sc.maxBundleID[txInfo.DesiredHeight] = txInfo.BundleID
}
sc.maxBundleIDMtx.Unlock()

// -------- TX INSERTION INTO MAIN TXS LIST ---------
// -------- TODO: In the future probably want to refactor to not have txs clist ---------
Expand Down Expand Up @@ -368,9 +360,6 @@ func (sc *CListPriorityTxSidecar) Update(
txs types.Txs,
deliverTxResponses []*abci.ResponseDeliverTx,
) error {

// Set height for block last updated to (i.e. block last committed)
sc.height = height
sc.notifiedTxsAvailable = false
sc.heightForFiringAuction = height + 1

Expand All @@ -396,7 +385,7 @@ func (sc *CListPriorityTxSidecar) Update(
}

sc.cache.Reset()
sc.maxBundleID = 0
delete(sc.maxBundleID, height)

// remove from txs list and txmap
for e := sc.txs.Front(); e != nil; e = e.Next() {
Expand Down Expand Up @@ -444,7 +433,7 @@ func (sc *CListPriorityTxSidecar) Flush() {
sc.cache.Reset()

sc.notifiedTxsAvailable = false
sc.maxBundleID = 0
delete(sc.maxBundleID, sc.heightForFiringAuction)

_ = atomic.SwapInt64(&sc.txsBytes, 0)

Expand Down Expand Up @@ -481,7 +470,7 @@ func (sc *CListPriorityTxSidecar) NumBundles() int {

// Safe for concurrent use by multiple goroutines.
func (sc *CListPriorityTxSidecar) MaxBundleID() int64 {
return sc.maxBundleID
return sc.maxBundleID[sc.heightForFiringAuction]
}

func (sc *CListPriorityTxSidecar) HeightForFiringAuction() int64 {
Expand Down Expand Up @@ -557,7 +546,7 @@ func (sc *CListPriorityTxSidecar) ReapMaxTxs() types.ReapedTxs {
// iterate over all BundleIDs up to the max we've seen
// CONTRACT: this assumes that bundles don't care about previous bundles,
// so still want to execute if any missing between
for bundleIDIter := 0; bundleIDIter <= int(sc.maxBundleID); bundleIDIter++ {
for bundleIDIter := 0; bundleIDIter <= int(sc.maxBundleID[sc.heightForFiringAuction]); bundleIDIter++ {
bundleIDIter := int64(bundleIDIter)

if bundle, ok := sc.bundles.Load(Key{sc.heightForFiringAuction, bundleIDIter}); ok {
Expand Down Expand Up @@ -593,6 +582,7 @@ func (sc *CListPriorityTxSidecar) ReapMaxTxs() types.ReapedTxs {
"bundleOrder", bundleOrderIter,
"height", sc.heightForFiringAuction,
)
break
}
}

Expand Down
1 change: 0 additions & 1 deletion mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,5 @@ type Bundle struct {
CurrSize int64 // total size of bundle
EnforcedSize int64 // total size of bundle

GasWanted int64 // amount of gas this tx states it will require
OrderedTxsMap *sync.Map // map from bundleOrder to *mempoolTx
}
12 changes: 2 additions & 10 deletions mempool/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (memR *Reactor) broadcastSidecarTxRoutine(peer p2p.Peer) {
GasWanted: scTx.GasWanted,
}

success := p2p.SendEnvelopeShim(peer, p2p.Envelope{
success := p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: mempool.SidecarLegacyChannel,
Message: msg,
}, memR.Logger)
Expand All @@ -392,14 +392,6 @@ func (memR *Reactor) broadcastSidecarTxRoutine(peer p2p.Peer) {
continue
}
}
} else {
memR.Logger.Info(
"broadcasting sidecarTx to peer failed",
"peer", peerID,
"was considered sidecarPeer", isSidecarPeer,
"was converted to sidecarTx", okConv,
"tx", scTx.Tx.Hash(),
)
}
}

Expand Down Expand Up @@ -454,7 +446,7 @@ func (memR *Reactor) broadcastMempoolTxRoutine(peer p2p.Peer) {

// Allow for a lag of 1 block.
memTx := next.Value.(*mempoolTx)
if peerState.GetHeight() < memTx.height-1 {
if peerState.GetHeight() < memTx.Height()-1 {
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
continue
}
Expand Down
10 changes: 1 addition & 9 deletions mempool/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (memR *Reactor) broadcastSidecarTxRoutine(peer p2p.Peer) {
GasWanted: scTx.GasWanted,
}

success := p2p.SendEnvelopeShim(peer, p2p.Envelope{
success := p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: mempool.SidecarLegacyChannel,
Message: msg,
}, memR.Logger)
Expand All @@ -395,14 +395,6 @@ func (memR *Reactor) broadcastSidecarTxRoutine(peer p2p.Peer) {
continue
}
}
} else {
memR.Logger.Info(
"broadcasting sidecarTx to peer failed",
"peer", peerID,
"was considered sidecarPeer", isSidecarPeer,
"was converted to sidecarTx", okConv,
"tx", scTx.Tx.Hash(),
)
}
}

Expand Down
11 changes: 5 additions & 6 deletions p2p/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type Switch struct {
rng *rand.Rand // seed for randomizing dial times and orders

metrics *Metrics
mlc *metricsLabelCache
mlc *metricsLabelCache
mevMetrics *mev.Metrics
sidecarPeers sidecarPeers
SentinelPeerString string
Expand Down Expand Up @@ -463,10 +463,6 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
// https://github.com/tendermint/tendermint/issues/3338
if sw.peers.Remove(peer) {
sw.metrics.Peers.Add(float64(-1))
} else {
// Removal of the peer has failed. The function above sets a flag within the peer to mark this.
// We keep this message here as information to the developer.
sw.Logger.Debug("error on peer removal", ",", "peer", peer.ID())

// check if we removed sentinel, if so, alert metrics
splitStr := strings.Split(sw.SentinelPeerString, "@")
Expand All @@ -482,7 +478,10 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
} else {
sw.Logger.Error("Error splitting sentinel ID", "is it correctly configured?", sw.SentinelPeerString)
}

} else {
// Removal of the peer has failed. The function above sets a flag within the peer to mark this.
// We keep this message here as information to the developer.
sw.Logger.Debug("error on peer removal", ",", "peer", peer.ID())
}
}

Expand Down
7 changes: 2 additions & 5 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ func (blockExec *BlockExecutor) Commit(
deliverTxResponses []*abci.ResponseDeliverTx,
) ([]byte, int64, error) {
blockExec.mempool.Lock()
blockExec.sidecar.Lock()
defer blockExec.mempool.Unlock()
defer blockExec.sidecar.Unlock()

// while mempool is Locked, flush to ensure all async requests have completed
// in the ABCI app before Commit.
Expand Down Expand Up @@ -258,7 +260,6 @@ func (blockExec *BlockExecutor) Commit(
)
if err != nil {
blockExec.logger.Error("error while updating sidecar", "err", err)
return nil, 0, err
}
}

Expand All @@ -270,10 +271,6 @@ func (blockExec *BlockExecutor) Commit(
TxPreCheck(state),
TxPostCheck(state),
)
if err != nil {
blockExec.logger.Error("error while updating mempool", "err", err)
return nil, 0, err
}

return res.Data, res.RetainHeight, err
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/runner/setup.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//nolint: gosec
// nolint: gosec
package main

import (
Expand Down
2 changes: 1 addition & 1 deletion test/fuzz/p2p/pex/init-corpus/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//nolint: gosec
// nolint: gosec
package main

import (
Expand Down

0 comments on commit 778fb26

Please sign in to comment.