Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client/v2): broadcast logic #22282

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/v2/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [#18461](https://github.com/cosmos/cosmos-sdk/pull/18461) Support governance proposals.
* [#20623](https://github.com/cosmos/cosmos-sdk/pull/20623) Introduce client/v2 tx factory.
* [#20623](https://github.com/cosmos/cosmos-sdk/pull/20623) Extend client/v2 keyring interface with `KeyType` and `KeyInfo`.
* [#22282](https://github.com/cosmos/cosmos-sdk/pull/22282) Added custom broadcast logic.

### Improvements

Expand Down
15 changes: 15 additions & 0 deletions client/v2/broadcast/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package broadcast

import "context"

// Broadcaster defines an interface for broadcasting transactions to the consensus engine.
type Broadcaster interface {
// Broadcast sends a transaction to the network and returns the result.
//
// It returns a byte slice containing the formatted result that will be
// passed to the output writer, and an error if the broadcast failed.
Broadcast(ctx context.Context, txBytes []byte) ([]byte, error)

// Consensus returns the consensus engine identifier for this Broadcaster.
Consensus() string
}
195 changes: 195 additions & 0 deletions client/v2/broadcast/comet/comet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package comet

import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/cometbft/cometbft/mempool"
rpcclient "github.com/cometbft/cometbft/rpc/client"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
cmttypes "github.com/cometbft/cometbft/types"

apiacbci "cosmossdk.io/api/cosmos/base/abci/v1beta1"
"cosmossdk.io/client/v2/broadcast"

"github.com/cosmos/cosmos-sdk/codec"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

const (
// BroadcastSync defines a tx broadcasting mode where the client waits for
// a CheckTx execution response only.
BroadcastSync = "sync"
// BroadcastAsync defines a tx broadcasting mode where the client returns
// immediately.
BroadcastAsync = "async"

// cometBftConsensus is the identifier for the CometBFT consensus engine.
cometBFTConsensus = "comet"
)

// CometRPC defines the interface of a CometBFT RPC client needed for
// queries and transaction handling.
type CometRPC interface {
rpcclient.ABCIClient

Validators(ctx context.Context, height *int64, page, perPage *int) (*coretypes.ResultValidators, error)
Status(context.Context) (*coretypes.ResultStatus, error)
Block(ctx context.Context, height *int64) (*coretypes.ResultBlock, error)
BlockByHash(ctx context.Context, hash []byte) (*coretypes.ResultBlock, error)
BlockResults(ctx context.Context, height *int64) (*coretypes.ResultBlockResults, error)
BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*coretypes.ResultBlockchainInfo, error)
Commit(ctx context.Context, height *int64) (*coretypes.ResultCommit, error)
Tx(ctx context.Context, hash []byte, prove bool) (*coretypes.ResultTx, error)
TxSearch(
ctx context.Context,
query string,
prove bool,
page, perPage *int,
orderBy string,
) (*coretypes.ResultTxSearch, error)
BlockSearch(
ctx context.Context,
query string,
page, perPage *int,
orderBy string,
) (*coretypes.ResultBlockSearch, error)
}

var _ broadcast.Broadcaster = CometBFTBroadcaster{}

// CometBFTBroadcaster implements the Broadcaster interface for CometBFT consensus engine.
type CometBFTBroadcaster struct {
rpcClient CometRPC
mode string
cdc codec.JSONCodec
}

// NewCometBFTBroadcaster creates a new CometBftBroadcaster.
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved
func NewCometBFTBroadcaster(rpcURL, mode string, cdc codec.JSONCodec) (*CometBFTBroadcaster, error) {
if cdc == nil {
return nil, errors.New("codec can't be nil")
}

if mode == "" {
mode = BroadcastSync
}

rpcClient, err := rpchttp.New(rpcURL)
if err != nil {
return nil, fmt.Errorf("failed to create CometBft RPC client: %w", err)
}

return &CometBFTBroadcaster{
rpcClient: rpcClient,
mode: mode,
cdc: cdc,
}, nil
}

func (c CometBFTBroadcaster) Consensus() string {
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved
return cometBFTConsensus
}

// Broadcast sends a transaction to the network and returns the result.
// returns a byte slice containing the JSON-encoded result and an error if the broadcast failed.
func (c CometBFTBroadcaster) Broadcast(ctx context.Context, txBytes []byte) ([]byte, error) {
if c.cdc == nil {
return []byte{}, fmt.Errorf("JSON codec is not initialized")
}

var fn func(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error)
switch c.mode {
case BroadcastSync:
fn = c.rpcClient.BroadcastTxSync
case BroadcastAsync:
fn = c.rpcClient.BroadcastTxAsync
default:
return []byte{}, fmt.Errorf("unknown broadcast mode: %s", c.mode)
}
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved

res, err := c.broadcast(ctx, txBytes, fn)
if err != nil {
return []byte{}, err
}

return c.cdc.MarshalJSON(res)
}
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved

// broadcast sends a transaction to the CometBFT network using the provided function.
func (c CometBFTBroadcaster) broadcast(ctx context.Context, txbytes []byte,
fn func(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error),
) (*apiacbci.TxResponse, error) {
bResult, err := fn(ctx, txbytes)
if errRes := checkCometError(err, txbytes); err != nil {
return errRes, nil
}
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved

return newResponseFormatBroadcastTx(bResult), err
}
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved

// checkCometError checks for errors returned by the CometBFT network and returns an appropriate TxResponse.
// It extracts error information and constructs a TxResponse with the error details.
func checkCometError(err error, tx cmttypes.Tx) *apiacbci.TxResponse {
if err == nil {
return nil
}

errStr := strings.ToLower(err.Error())
txHash := fmt.Sprintf("%X", tx.Hash())

switch {
case strings.Contains(errStr, strings.ToLower(mempool.ErrTxInCache.Error())):
return &apiacbci.TxResponse{
Code: sdkerrors.ErrTxInMempoolCache.ABCICode(),
Codespace: sdkerrors.ErrTxInMempoolCache.Codespace(),
Txhash: txHash,
}

case strings.Contains(errStr, "mempool is full"):
return &apiacbci.TxResponse{
Code: sdkerrors.ErrMempoolIsFull.ABCICode(),
Codespace: sdkerrors.ErrMempoolIsFull.Codespace(),
Txhash: txHash,
}

case strings.Contains(errStr, "tx too large"):
return &apiacbci.TxResponse{
Code: sdkerrors.ErrTxTooLarge.ABCICode(),
Codespace: sdkerrors.ErrTxTooLarge.Codespace(),
Txhash: txHash,
}

default:
return nil
}
}

// newResponseFormatBroadcastTx returns a TxResponse given a ResultBroadcastTx from cometbft
func newResponseFormatBroadcastTx(res *coretypes.ResultBroadcastTx) *apiacbci.TxResponse {
if res == nil {
return nil
}

parsedLogs, _ := parseABCILogs(res.Log)

return &apiacbci.TxResponse{
Code: res.Code,
Codespace: res.Codespace,
Data: res.Data.String(),
RawLog: res.Log,
Logs: parsedLogs,
Txhash: res.Hash.String(),
}
}

// parseABCILogs attempts to parse a stringified ABCI tx log into a slice of
// ABCIMessageLog types. It returns an error upon JSON decoding failure.
func parseABCILogs(logs string) (res []*apiacbci.ABCIMessageLog, err error) {
err = json.Unmarshal([]byte(logs), &res)
return res, err
}
133 changes: 133 additions & 0 deletions client/v2/broadcast/comet/comet_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package comet

import (
"context"
"errors"
"testing"

"github.com/cometbft/cometbft/mempool"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

apiacbci "cosmossdk.io/api/cosmos/base/abci/v1beta1"
mockrpc "cosmossdk.io/client/v2/broadcast/comet/testutil"

"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/codec/testutil"
)

var cdc = testutil.CodecOptions{}.NewCodec()

func TestNewCometBftBroadcaster(t *testing.T) {
tests := []struct {
name string
cdc codec.JSONCodec
mode string
want *CometBFTBroadcaster
wantErr bool
}{
{
name: "constructor",
mode: BroadcastSync,
cdc: cdc,
want: &CometBFTBroadcaster{
mode: BroadcastSync,
cdc: cdc,
},
},
{
name: "nil codec",
mode: BroadcastSync,
cdc: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewCometBFTBroadcaster("localhost:26657", tt.mode, tt.cdc)
if tt.wantErr {
require.Error(t, err)
require.Nil(t, got)
} else {
require.Equal(t, got.mode, tt.want.mode)
require.Equal(t, got.cdc, tt.want.cdc)
}
})
}
}

func TestCometBftBroadcaster_Broadcast(t *testing.T) {
ctrl := gomock.NewController(t)
cometMock := mockrpc.NewMockCometRPC(ctrl)
c := CometBFTBroadcaster{
rpcClient: cometMock,
mode: BroadcastSync,
cdc: cdc,
}
tests := []struct {
name string
mode string
want []byte
wantErr bool
}{
{
name: "sync",
mode: BroadcastSync,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c.mode = tt.mode
cometMock.EXPECT().BroadcastTxSync(context.Background(), gomock.Any()).Return(&coretypes.ResultBroadcastTx{
Code: 0,
Data: []byte{},
Log: "",
Codespace: "",
Hash: []byte("%�����\u0010\n�T�\u0017\u0016�N^H[5�\u0006}�n�w�/Vi� "),
}, nil)
got, err := c.Broadcast(context.Background(), []byte{})
if (err != nil) != tt.wantErr {
t.Errorf("Broadcast() error = %v, wantErr %v", err, tt.wantErr)
return
}
require.NotNil(t, got)
})
}
}
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved

func Test_checkCometError(t *testing.T) {
tests := []struct {
name string
err error
want *apiacbci.TxResponse
}{
{
name: "error in cache",
err: errors.New("tx already exists in cache"),
want: &apiacbci.TxResponse{
Code: 19,
},
},
{
name: "error in cache",
err: mempool.ErrMempoolIsFull{},
want: &apiacbci.TxResponse{
Code: 20,
},
},
{
name: "error in cache",
err: mempool.ErrTxTooLarge{},
want: &apiacbci.TxResponse{
Code: 21,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := checkCometError(tt.err, []byte{})
require.Equal(t, got.Code, tt.want.Code)
})
}
}
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading