Skip to content

Commit

Permalink
abci: Add relaxed local client synchronization models (cometbft#1141) (
Browse files Browse the repository at this point in the history
…#2)

* proxy: Remove "unsynchronized" local client creator



* proxy: Expand client creator interface

Expand the `ClientCreator` interface to allow the caller to explicitly
specify the "connection" whose client they are creating. This
potentially gives greater control over the concurrency model employed in
each type of connection.



* abci/client: Clarify NewLocalClient description



* proxy: Add connection-synchronized local client creator

Analogous to the old "unsynchronized" local client creator.



* abci/client: Add unsynchronized local client



* proxy: Add consensus-synchronized local client creator



* proxy: Fix mock configuration in test



* Add changelog entries



* Remove changelog entry - no longer necessary



* proxy: Add unsynchronized local client creator



* changelog: Add entry for unsync local client creator



* Update 1141-abci-unsync-proxy.md



---------

Signed-off-by: Thane Thomson <[email protected]>
Co-authored-by: Thane Thomson <[email protected]>
Co-authored-by: Adi Seredinschi <[email protected]>
  • Loading branch information
3 people authored Nov 1, 2024
1 parent 8136e47 commit cee5fa0
Show file tree
Hide file tree
Showing 14 changed files with 484 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[proxy]` Expand `ClientCreator` interface to allow
for per-"connection" control of client creation
([\#1141](https://github.com/cometbft/cometbft/pull/1141))
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- `[abci/client]` Add consensus-synchronized local client creator,
which only imposes a mutex on the consensus "connection", leaving
the concurrency of all other "connections" up to the application
([\#1141](https://github.com/cometbft/cometbft/pull/1141))
3 changes: 3 additions & 0 deletions .changelog/v0.38.0/improvements/1141-abci-unsync-proxy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[abci/client]` Add fully unsynchronized local client creator, which
imposes no mutexes on the application, leaving all handling of concurrency up
to the application ([\#1141](https://github.com/cometbft/cometbft/pull/1141))
16 changes: 10 additions & 6 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ type localClient struct {

var _ Client = (*localClient)(nil)

// NewLocalClient creates a local client, which wraps the application interface that
// Tendermint as the client will call to the application as the server. The only
// difference, is that the local client has a global mutex which enforces serialization
// of all the ABCI calls from Tendermint to the Application.
// NewLocalClient creates a local client, which wraps the application interface
// that Comet as the client will call to the application as the server.
//
// Concurrency control in each client instance is enforced by way of a single
// mutex. If a mutex is not supplied (i.e. if mtx is nil), then one will be
// created.
func NewLocalClient(mtx *cmtsync.Mutex, app types.Application) Client {
if mtx == nil {
mtx = new(cmtsync.Mutex)
Expand Down Expand Up @@ -135,15 +137,17 @@ func (app *localClient) OfferSnapshot(ctx context.Context, req *types.RequestOff
}

func (app *localClient) LoadSnapshotChunk(ctx context.Context,
req *types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
req *types.RequestLoadSnapshotChunk,
) (*types.ResponseLoadSnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

return app.Application.LoadSnapshotChunk(ctx, req)
}

func (app *localClient) ApplySnapshotChunk(ctx context.Context,
req *types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
req *types.RequestApplySnapshotChunk,
) (*types.ResponseApplySnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

Expand Down
136 changes: 136 additions & 0 deletions abci/client/unsync_local_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package abcicli

import (
"context"
"sync"

types "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/service"
)

type unsyncLocalClient struct {
service.BaseService

types.Application

mtx sync.Mutex
Callback
}

var _ Client = (*unsyncLocalClient)(nil)

// NewUnsyncLocalClient creates a local client, which wraps the application
// interface that Comet as the client will call to the application as the
// server.
//
// This differs from [NewLocalClient] in that it returns a client that only
// maintains a mutex over the callback used by CheckTxAsync and not over the
// application, leaving it up to the proxy to handle all concurrency. If the
// proxy does not impose any concurrency restrictions, it is then left up to
// the application to implement its own concurrency for the relevant group of
// calls.
func NewUnsyncLocalClient(app types.Application) Client {
cli := &unsyncLocalClient{
Application: app,
}
cli.BaseService = *service.NewBaseService(nil, "unsyncLocalClient", cli)
return cli
}

func (app *unsyncLocalClient) SetResponseCallback(cb Callback) {
app.mtx.Lock()
app.Callback = cb
app.mtx.Unlock()
}

func (app *unsyncLocalClient) CheckTxAsync(ctx context.Context, req *types.RequestCheckTx) (*ReqRes, error) {
res, err := app.Application.CheckTx(ctx, req)
if err != nil {
return nil, err
}
return app.callback(
types.ToRequestCheckTx(req),
types.ToResponseCheckTx(res),
), nil
}

func (app *unsyncLocalClient) callback(req *types.Request, res *types.Response) *ReqRes {
app.Callback(req, res)
rr := newLocalReqRes(req, res)
rr.callbackInvoked = true
return rr
}

//-------------------------------------------------------

func (app *unsyncLocalClient) Error() error {
return nil
}

func (app *unsyncLocalClient) Flush(context.Context) error {
return nil
}

func (app *unsyncLocalClient) Echo(_ context.Context, msg string) (*types.ResponseEcho, error) {
return &types.ResponseEcho{Message: msg}, nil
}

func (app *unsyncLocalClient) Info(ctx context.Context, req *types.RequestInfo) (*types.ResponseInfo, error) {
return app.Application.Info(ctx, req)
}

func (app *unsyncLocalClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return app.Application.CheckTx(ctx, req)
}

func (app *unsyncLocalClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) {
return app.Application.Query(ctx, req)
}

func (app *unsyncLocalClient) Commit(ctx context.Context, req *types.RequestCommit) (*types.ResponseCommit, error) {
return app.Application.Commit(ctx, req)
}

func (app *unsyncLocalClient) InitChain(ctx context.Context, req *types.RequestInitChain) (*types.ResponseInitChain, error) {
return app.Application.InitChain(ctx, req)
}

func (app *unsyncLocalClient) ListSnapshots(ctx context.Context, req *types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
return app.Application.ListSnapshots(ctx, req)
}

func (app *unsyncLocalClient) OfferSnapshot(ctx context.Context, req *types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
return app.Application.OfferSnapshot(ctx, req)
}

func (app *unsyncLocalClient) LoadSnapshotChunk(ctx context.Context,
req *types.RequestLoadSnapshotChunk,
) (*types.ResponseLoadSnapshotChunk, error) {
return app.Application.LoadSnapshotChunk(ctx, req)
}

func (app *unsyncLocalClient) ApplySnapshotChunk(ctx context.Context,
req *types.RequestApplySnapshotChunk,
) (*types.ResponseApplySnapshotChunk, error) {
return app.Application.ApplySnapshotChunk(ctx, req)
}

func (app *unsyncLocalClient) PrepareProposal(ctx context.Context, req *types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
return app.Application.PrepareProposal(ctx, req)
}

func (app *unsyncLocalClient) ProcessProposal(ctx context.Context, req *types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
return app.Application.ProcessProposal(ctx, req)
}

func (app *unsyncLocalClient) ExtendVote(ctx context.Context, req *types.RequestExtendVote) (*types.ResponseExtendVote, error) {
return app.Application.ExtendVote(ctx, req)
}

func (app *unsyncLocalClient) VerifyVoteExtension(ctx context.Context, req *types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
return app.Application.VerifyVoteExtension(ctx, req)
}

func (app *unsyncLocalClient) FinalizeBlock(ctx context.Context, req *types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
return app.Application.FinalizeBlock(ctx, req)
}
2 changes: 1 addition & 1 deletion consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func newMockProxyApp(finalizeBlockResponse *abci.ResponseFinalizeBlock) proxy.Ap
clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{
finalizeBlockResponse: finalizeBlockResponse,
})
cli, _ := clientCreator.NewABCIClient()
cli, _ := clientCreator.NewABCIConsensusClient()
err := cli.Start()
if err != nil {
panic(err)
Expand Down
6 changes: 3 additions & 3 deletions mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func newMempoolWithApp(cc proxy.ClientCreator) (*CListMempool, cleanupFunc) {
}

func newMempoolWithAppAndConfig(cc proxy.ClientCreator, cfg *config.Config) (*CListMempool, cleanupFunc) {
appConnMem, _ := cc.NewABCIClient()
appConnMem, _ := cc.NewABCIMempoolClient()
appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
err := appConnMem.Start()
if err != nil {
Expand Down Expand Up @@ -424,7 +424,7 @@ func TestSerialReap(t *testing.T) {
mp, cleanup := newMempoolWithApp(cc)
defer cleanup()

appConnCon, _ := cc.NewABCIClient()
appConnCon, _ := cc.NewABCIConsensusClient()
appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
err := appConnCon.Start()
require.Nil(t, err)
Expand Down Expand Up @@ -629,7 +629,7 @@ func TestMempoolTxsBytes(t *testing.T) {
require.NoError(t, err)
assert.EqualValues(t, 10, mp.SizeBytes())

appConnCon, _ := cc.NewABCIClient()
appConnCon, _ := cc.NewABCIConsensusClient()
appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
err = appConnCon.Start()
require.Nil(t, err)
Expand Down
4 changes: 2 additions & 2 deletions proxy/app_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestEcho(t *testing.T) {
})

// Start client
cli, err := clientCreator.NewABCIClient()
cli, err := clientCreator.NewABCIMempoolClient()
if err != nil {
t.Fatalf("Error creating ABCI client: %v", err.Error())
}
Expand Down Expand Up @@ -72,7 +72,7 @@ func BenchmarkEcho(b *testing.B) {
})

// Start client
cli, err := clientCreator.NewABCIClient()
cli, err := clientCreator.NewABCIMempoolClient()
if err != nil {
b.Fatalf("Error creating ABCI client: %v", err.Error())
}
Expand Down
Loading

0 comments on commit cee5fa0

Please sign in to comment.