Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: introduce mutex for state and lastCommitInfo to avoid race conditions #22692

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Every module contains its own CHANGELOG.md. Please refer to the module you are i

* (sims) [#21906](https://github.com/cosmos/cosmos-sdk/pull/21906) Skip sims test when running dry on validators
* (cli) [#21919](https://github.com/cosmos/cosmos-sdk/pull/21919) Query address-by-acc-num by account_id instead of id.
* (baseapp) [#22692](https://github.com/cosmos/cosmos-sdk/pull/22692) Add mutex locks for `state` and `lastCommitInfo` to prevent race conditions between `Commit` and `CreateQueryContext`.

### API Breaking Changes

Expand Down
72 changes: 41 additions & 31 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ func (app *BaseApp) InitChain(req *abci.InitChainRequest) (*abci.InitChainRespon
// initialize states with a correct header
app.setState(execModeFinalize, initHeader)
app.setState(execModeCheck, initHeader)
finalizeState := app.getState(execModeFinalize)

// Store the consensus params in the BaseApp's param store. Note, this must be
// done after the finalizeBlockState and context have been set as it's persisted
// to state.
if req.ConsensusParams != nil {
err := app.StoreConsensusParams(app.finalizeBlockState.Context(), *req.ConsensusParams)
err := app.StoreConsensusParams(finalizeState.Context(), *req.ConsensusParams)
if err != nil {
return nil, err
}
Expand All @@ -86,13 +87,14 @@ func (app *BaseApp) InitChain(req *abci.InitChainRequest) (*abci.InitChainRespon
// handler, the block height is zero by default. However, after Commit is called
// the height needs to reflect the true block height.
initHeader.Height = req.InitialHeight
app.checkState.SetContext(app.checkState.Context().WithBlockHeader(initHeader).
checkState := app.getState(execModeCheck)
checkState.SetContext(checkState.Context().WithBlockHeader(initHeader).
WithHeaderInfo(coreheader.Info{
ChainID: req.ChainId,
Height: req.InitialHeight,
Time: req.Time,
}))
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockHeader(initHeader).
finalizeState.SetContext(finalizeState.Context().WithBlockHeader(initHeader).
WithHeaderInfo(coreheader.Info{
ChainID: req.ChainId,
Height: req.InitialHeight,
Expand All @@ -105,9 +107,9 @@ func (app *BaseApp) InitChain(req *abci.InitChainRequest) (*abci.InitChainRespon
}

// add block gas meter for any genesis transactions (allow infinite gas)
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(storetypes.NewInfiniteGasMeter()))
finalizeState.SetContext(finalizeState.Context().WithBlockGasMeter(storetypes.NewInfiniteGasMeter()))

res, err := app.initChainer(app.finalizeBlockState.Context(), req)
res, err := app.initChainer(finalizeState.Context(), req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -604,7 +606,7 @@ func (app *BaseApp) ExtendVote(_ context.Context, req *abci.ExtendVoteRequest) (
// finalizeBlockState context, otherwise we don't get the uncommitted data
// from InitChain.
if req.Height == app.initialHeight {
ctx, _ = app.finalizeBlockState.Context().CacheContext()
ctx, _ = app.getState(execModeFinalize).Context().CacheContext()
} else {
ms := app.cms.CacheMultiStore()
ctx = sdk.NewContext(ms, false, app.logger).WithStreamingManager(app.streamingManager).WithChainID(app.chainID).WithBlockHeight(req.Height)
Expand Down Expand Up @@ -684,7 +686,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.VerifyVoteExtensionRequest) (r
// finalizeBlockState context, otherwise we don't get the uncommitted data
// from InitChain.
if req.Height == app.initialHeight {
ctx, _ = app.finalizeBlockState.Context().CacheContext()
ctx, _ = app.getState(execModeFinalize).Context().CacheContext()
} else {
ms := app.cms.CacheMultiStore()
ctx = sdk.NewContext(ms, false, app.logger).WithStreamingManager(app.streamingManager).WithChainID(app.chainID).WithBlockHeight(req.Height)
Expand Down Expand Up @@ -742,7 +744,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.VerifyVoteExtensionRequest) (r

// internalFinalizeBlock executes the block, called by the Optimistic
// Execution flow or by the FinalizeBlock ABCI method. The context received is
// only used to handle early cancellation, for anything related to state app.finalizeBlockState.Context()
// only used to handle early cancellation, for anything related to state app.getState(execModeFinalize).Context()
// must be used.
func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.FinalizeBlockRequest) (*abci.FinalizeBlockResponse, error) {
var events []abci.Event
Expand Down Expand Up @@ -773,12 +775,14 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
// finalizeBlockState should be set on InitChain or ProcessProposal. If it is
// nil, it means we are replaying this block and we need to set the state here
// given that during block replay ProcessProposal is not executed by CometBFT.
if app.finalizeBlockState == nil {
finalizeState := app.getState(execModeFinalize)
if finalizeState == nil {
app.setState(execModeFinalize, header)
finalizeState = app.getState(execModeFinalize)
beer-1 marked this conversation as resolved.
Show resolved Hide resolved
}

// Context is now updated with Header information.
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().
finalizeState.SetContext(finalizeState.Context().
WithBlockHeader(header).
WithHeaderHash(req.Hash).
WithHeaderInfo(coreheader.Info{
Expand All @@ -788,7 +792,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
Hash: req.Hash,
AppHash: app.LastCommitID().Hash,
}).
WithConsensusParams(app.GetConsensusParams(app.finalizeBlockState.Context())).
WithConsensusParams(app.GetConsensusParams(finalizeState.Context())).
WithVoteInfos(req.DecidedLastCommit.Votes).
WithExecMode(sdk.ExecModeFinalize).
WithCometInfo(corecomet.Info{
Expand All @@ -799,11 +803,11 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
}))

// GasMeter must be set after we get a context with updated consensus params.
gasMeter := app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))
gasMeter := app.getBlockGasMeter(finalizeState.Context())
finalizeState.SetContext(finalizeState.Context().WithBlockGasMeter(gasMeter))

if app.checkState != nil {
app.checkState.SetContext(app.checkState.Context().
if checkState := app.getState(execModeCheck); checkState != nil {
checkState.SetContext(checkState.Context().
WithBlockGasMeter(gasMeter).
WithHeaderHash(req.Hash))
}
Expand Down Expand Up @@ -831,8 +835,8 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
events = append(events, beginBlock.Events...)

// Reset the gas meter so that the AnteHandlers aren't required to
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))
gasMeter = app.getBlockGasMeter(finalizeState.Context())
finalizeState.SetContext(finalizeState.Context().WithBlockGasMeter(gasMeter))

// Iterate over all raw transactions in the proposal and attempt to execute
// them, gathering the execution results.
Expand Down Expand Up @@ -861,11 +865,11 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
txResults = append(txResults, response)
}

if app.finalizeBlockState.ms.TracingEnabled() {
app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
if finalizeState.ms.TracingEnabled() {
finalizeState.ms = finalizeState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
beer-1 marked this conversation as resolved.
Show resolved Hide resolved
}

endBlock, err := app.endBlock(app.finalizeBlockState.Context())
endBlock, err := app.endBlock(finalizeState.Context())
if err != nil {
return nil, err
}
Expand All @@ -879,7 +883,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
}

events = append(events, endBlock.Events...)
cp := app.GetConsensusParams(app.finalizeBlockState.Context())
cp := app.GetConsensusParams(finalizeState.Context())

return &abci.FinalizeBlockResponse{
Events: events,
Expand All @@ -903,7 +907,7 @@ func (app *BaseApp) FinalizeBlock(req *abci.FinalizeBlockRequest) (res *abci.Fin
defer func() {
// call the streaming service hooks with the FinalizeBlock messages
for _, streamingListener := range app.streamingManager.ABCIListeners {
if streamErr := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.Context(), *req, *res); streamErr != nil {
if streamErr := streamingListener.ListenFinalizeBlock(app.getState(execModeFinalize).Context(), *req, *res); streamErr != nil {
app.logger.Error("ListenFinalizeBlock listening hook failed", "height", req.Height, "err", err)
if app.streamingManager.StopNodeOnErr {
// if StopNodeOnErr is set, we should return the streamErr in order to stop the node
Expand All @@ -929,7 +933,10 @@ func (app *BaseApp) FinalizeBlock(req *abci.FinalizeBlockRequest) (res *abci.Fin
}

// if it was aborted, we need to reset the state
app.stateMut.Lock()
app.finalizeBlockState = nil
app.stateMut.Unlock()

app.optimisticExec.Reset()
}

Expand Down Expand Up @@ -968,11 +975,12 @@ func (app *BaseApp) checkHalt(height int64, time time.Time) error {
// against that height and gracefully halt if it matches the latest committed
// height.
func (app *BaseApp) Commit() (*abci.CommitResponse, error) {
header := app.finalizeBlockState.Context().BlockHeader()
finalizeState := app.getState(execModeFinalize)
header := finalizeState.Context().BlockHeader()
retainHeight := app.GetBlockRetentionHeight(header.Height)

if app.precommiter != nil {
app.precommiter(app.finalizeBlockState.Context())
app.precommiter(finalizeState.Context())
}

rms, ok := app.cms.(*rootmulti.Store)
Expand All @@ -988,7 +996,7 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) {

abciListeners := app.streamingManager.ABCIListeners
if len(abciListeners) > 0 {
ctx := app.finalizeBlockState.Context()
ctx := finalizeState.Context()
blockHeight := ctx.BlockHeight()
changeSet := app.cms.PopStateCache()

Expand All @@ -1013,10 +1021,12 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) {
// Commit. Use the header from this latest block.
app.setState(execModeCheck, header)

app.stateMut.Lock()
app.finalizeBlockState = nil
app.stateMut.Unlock()

if app.prepareCheckStater != nil {
app.prepareCheckStater(app.checkState.Context())
app.prepareCheckStater(app.getState(execModeCheck).Context())
}

// The SnapshotIfApplicable method will create the snapshot by starting the goroutine
Expand All @@ -1034,7 +1044,7 @@ func (app *BaseApp) workingHash() []byte {
// Write the FinalizeBlock state into branched storage and commit the MultiStore.
// The write to the FinalizeBlock state writes all state transitions to the root
// MultiStore (app.cms) so when Commit() is called it persists those values.
app.finalizeBlockState.ms.Write()
app.getState(execModeFinalize).ms.Write()

// Get the hash of all writes in order to return the apphash to the comet in finalizeBlock.
commitHash := app.cms.WorkingHash()
Expand Down Expand Up @@ -1181,7 +1191,7 @@ func (app *BaseApp) FilterPeerByID(info string) *abci.QueryResponse {
// access any state changes made in InitChain.
func (app *BaseApp) getContextForProposal(ctx sdk.Context, height int64) sdk.Context {
if height == app.initialHeight {
ctx, _ = app.finalizeBlockState.Context().CacheContext()
ctx, _ = app.getState(execModeFinalize).Context().CacheContext()

// clear all context data set during InitChain to avoid inconsistent behavior
ctx = ctx.WithHeaderInfo(coreheader.Info{}).WithBlockHeader(cmtproto.Header{})
Expand Down Expand Up @@ -1282,8 +1292,8 @@ func (app *BaseApp) CreateQueryContextWithCheckHeader(height int64, prove, check
var header *cmtproto.Header
isLatest := height == 0
for _, state := range []*state{
app.checkState,
app.finalizeBlockState,
app.getState(execModeCheck),
app.getState(execModeFinalize),
beer-1 marked this conversation as resolved.
Show resolved Hide resolved
} {
if state != nil {
// branch the commit multi-store for safety
Expand Down Expand Up @@ -1396,7 +1406,7 @@ func (app *BaseApp) GetBlockRetentionHeight(commitHeight int64) int64 {
// evidence parameters instead of computing an estimated number of blocks based
// on the unbonding period and block commitment time as the two should be
// equivalent.
cp := app.GetConsensusParams(app.finalizeBlockState.Context())
cp := app.GetConsensusParams(app.getState(execModeFinalize).Context())
if cp.Evidence != nil && cp.Evidence.MaxAgeNumBlocks > 0 {
retentionHeight = commitHeight - cp.Evidence.MaxAgeNumBlocks
}
Expand Down
49 changes: 49 additions & 0 deletions baseapp/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math/rand"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -2779,3 +2780,51 @@ func TestABCI_Proposal_FailReCheckTx(t *testing.T) {
require.NotEmpty(t, res.TxResults[0].Events)
require.True(t, res.TxResults[0].IsOK(), fmt.Sprintf("%v", res))
}

func TestABCI_Race_Commit_Query(t *testing.T) {
suite := NewBaseAppSuite(t, baseapp.SetChainID("test-chain-id"))
app := suite.baseApp

_, err := app.InitChain(&abci.InitChainRequest{
ChainId: "test-chain-id",
ConsensusParams: &cmtproto.ConsensusParams{Block: &cmtproto.BlockParams{MaxGas: 5000000}},
InitialHeight: 1,
})
require.NoError(t, err)
_, err = app.Commit()
require.NoError(t, err)

counter := atomic.Uint64{}
counter.Store(0)

ctx, cancel := context.WithCancel(context.Background())
queryCreator := func() {
for {
select {
case <-ctx.Done():
return
default:
_, err := app.CreateQueryContextWithCheckHeader(0, false, false)
require.NoError(t, err)

counter.Add(1)
}
}
}

for i := 0; i < 100; i++ {
go queryCreator()
}
beer-1 marked this conversation as resolved.
Show resolved Hide resolved

for i := 0; i < 1000; i++ {
_, err = app.FinalizeBlock(&abci.FinalizeBlockRequest{Height: app.LastBlockHeight() + 1})
require.NoError(t, err)

_, err = app.Commit()
require.NoError(t, err)
}

cancel()

require.Equal(t, int64(1001), app.GetContextForCheckTx(nil).BlockHeight())
}
16 changes: 12 additions & 4 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type BaseApp struct {
prepareProposalState *state
processProposalState *state
finalizeBlockState *state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make notes on these *state variables that they should only be accessed and updated via the set and get methods instead of directly?

Alternatively creating a wrapper that does not actually expose the *state unless these getters or setters are called

stateMut sync.RWMutex

// An inter-block write-through cache provided to the context during the ABCI
// FinalizeBlock call.
Expand Down Expand Up @@ -494,6 +495,9 @@ func (app *BaseApp) setState(mode execMode, h cmtproto.Header) {
WithHeaderInfo(headerInfo),
}

app.stateMut.Lock()
defer app.stateMut.Unlock()

switch mode {
case execModeCheck:
baseState.SetContext(baseState.Context().WithIsCheckTx(true).WithMinGasPrices(app.minGasPrices))
Expand Down Expand Up @@ -633,6 +637,9 @@ func validateBasicTxMsgs(router *MsgServiceRouter, msgs []sdk.Msg) error {
}

func (app *BaseApp) getState(mode execMode) *state {
app.stateMut.RLock()
defer app.stateMut.RUnlock()

switch mode {
case execModeFinalize:
return app.finalizeBlockState
Expand Down Expand Up @@ -706,7 +713,8 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context
func (app *BaseApp) preBlock(req *abci.FinalizeBlockRequest) ([]abci.Event, error) {
var events []abci.Event
if app.preBlocker != nil {
ctx := app.finalizeBlockState.Context().WithEventManager(sdk.NewEventManager())
finalizeState := app.getState(execModeFinalize)
ctx := finalizeState.Context().WithEventManager(sdk.NewEventManager())
if err := app.preBlocker(ctx, req); err != nil {
return nil, err
}
Expand All @@ -716,7 +724,7 @@ func (app *BaseApp) preBlock(req *abci.FinalizeBlockRequest) ([]abci.Event, erro
// GasMeter must be set after we get a context with updated consensus params.
gasMeter := app.getBlockGasMeter(ctx)
ctx = ctx.WithBlockGasMeter(gasMeter)
app.finalizeBlockState.SetContext(ctx)
finalizeState.SetContext(ctx)
events = ctx.EventManager().ABCIEvents()

// append PreBlock attributes to all events
Expand All @@ -738,7 +746,7 @@ func (app *BaseApp) beginBlock(_ *abci.FinalizeBlockRequest) (sdk.BeginBlock, er
)

if app.beginBlocker != nil {
resp, err = app.beginBlocker(app.finalizeBlockState.Context())
resp, err = app.beginBlocker(app.getState(execModeFinalize).Context())
beer-1 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return resp, err
}
Expand Down Expand Up @@ -801,7 +809,7 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) {
var endblock sdk.EndBlock

if app.endBlocker != nil {
eb, err := app.endBlocker(app.finalizeBlockState.Context())
eb, err := app.endBlocker(app.getState(execModeFinalize).Context())
beer-1 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return endblock, err
}
Expand Down
6 changes: 3 additions & 3 deletions baseapp/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ func (app *BaseApp) SimDeliver(txEncoder sdk.TxEncoder, tx sdk.Tx) (sdk.GasInfo,
// SimWriteState is an entrypoint for simulations only. They are not executed during the normal ABCI finalize
// block step but later. Therefore, an extra call to the root multi-store (app.cms) is required to write the changes.
func (app *BaseApp) SimWriteState() {
app.finalizeBlockState.ms.Write()
app.getState(execModeFinalize).ms.Write()
}

// NewContextLegacy returns a new sdk.Context with the provided header
func (app *BaseApp) NewContextLegacy(isCheckTx bool, header cmtproto.Header) sdk.Context {
if isCheckTx {
return sdk.NewContext(app.checkState.ms, true, app.logger).
return sdk.NewContext(app.getState(execModeCheck).ms, true, app.logger).
WithMinGasPrices(app.minGasPrices).
WithBlockHeader(header)
}

return sdk.NewContext(app.finalizeBlockState.ms, false, app.logger).WithBlockHeader(header)
return sdk.NewContext(app.getState(execModeFinalize).ms, false, app.logger).WithBlockHeader(header)
}

// NewContext returns a new sdk.Context with a empty header
Expand Down
Loading
Loading