Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client/v2): broadcast logic #22282

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions client/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ require (
cosmossdk.io/api v0.7.6
cosmossdk.io/core v1.0.0-alpha.4
cosmossdk.io/depinject v1.0.0
cosmossdk.io/server/v2/cometbft v0.0.0-20241015140036-ee3d320eaa55
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved
cosmossdk.io/x/bank v0.0.0-20240226161501-23359a0b6d91
cosmossdk.io/x/gov v0.0.0-20231113122742-912390d5fc4a
cosmossdk.io/x/tx v0.13.3
github.com/cosmos/cosmos-proto v1.0.0-beta.5
github.com/cosmos/cosmos-sdk v0.53.0
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
go.uber.org/mock v0.4.0
google.golang.org/grpc v1.67.1
google.golang.org/protobuf v1.35.1
gotest.tools/v3 v3.5.1
Expand All @@ -27,7 +29,7 @@ require (
cosmossdk.io/errors v1.0.1 // indirect
cosmossdk.io/log v1.4.1 // indirect
cosmossdk.io/math v1.3.0
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 // indirect
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac // indirect
cosmossdk.io/store v1.1.1-0.20240418092142-896cdf1971bc // indirect
cosmossdk.io/x/staking v0.0.0-00010101000000-000000000000 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
Expand All @@ -47,7 +49,7 @@ require (
github.com/cockroachdb/pebble v1.1.2 // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/cometbft/cometbft v1.0.0-rc1.0.20240908111210-ab0be101882f // indirect
github.com/cometbft/cometbft v1.0.0-rc1.0.20240908111210-ab0be101882f
github.com/cometbft/cometbft-db v0.15.0 // indirect
github.com/cometbft/cometbft/api v1.0.0-rc.1 // indirect
github.com/cosmos/btcutil v1.0.5 // indirect
Expand Down Expand Up @@ -175,8 +177,12 @@ replace github.com/cosmos/cosmos-sdk => ./../../
// TODO remove post spinning out all modules
replace (
cosmossdk.io/api => ./../../api
cosmossdk.io/server/v2 => ./../../server/v2
cosmossdk.io/server/v2/cometbft => ./../../server/v2/cometbft
cosmossdk.io/store => ./../../store
cosmossdk.io/store/v2 => ./../../store/v2
cosmossdk.io/x/bank => ./../../x/bank
cosmossdk.io/x/consensus => ./../../x/consensus
cosmossdk.io/x/gov => ./../../x/gov
cosmossdk.io/x/staking => ./../../x/staking
cosmossdk.io/x/tx => ./../../x/tx
Expand Down
4 changes: 2 additions & 2 deletions client/v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ cosmossdk.io/log v1.4.1 h1:wKdjfDRbDyZRuWa8M+9nuvpVYxrEOwbD/CA8hvhU8QM=
cosmossdk.io/log v1.4.1/go.mod h1:k08v0Pyq+gCP6phvdI6RCGhLf/r425UT6Rk/m+o74rU=
cosmossdk.io/math v1.3.0 h1:RC+jryuKeytIiictDslBP9i1fhkVm6ZDmZEoNP316zE=
cosmossdk.io/math v1.3.0/go.mod h1:vnRTxewy+M7BtXBNFybkuhSH4WfedVAAnERHgVFhp3k=
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 h1:DmOoS/1PeY6Ih0hAVlJ69kLMUrLV+TCbfICrZtB1vdU=
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac h1:3joNZZWZ3k7fMsrBDL1ktuQ2xQwYLZOaDhkruadDFmc=
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
cosmossdk.io/x/protocolpool v0.0.0-20230925135524-a1bc045b3190 h1:XQJj9Dv9Gtze0l2TF79BU5lkP6MkUveTUuKICmxoz+o=
cosmossdk.io/x/protocolpool v0.0.0-20230925135524-a1bc045b3190/go.mod h1:7WUGupOvmlHJoIMBz1JbObQxeo6/TDiuDBxmtod8HRg=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
Expand Down
53 changes: 53 additions & 0 deletions client/v2/internal/broadcast/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package broadcast

import (
"context"
"fmt"
)

const (
// BroadcastSync defines a tx broadcasting mode where the client waits for
// a CheckTx execution response only.
BroadcastSync = "sync"
// BroadcastAsync defines a tx broadcasting mode where the client returns
// immediately.
BroadcastAsync = "async"

// cometBftConsensus is the identifier for the CometBFT consensus engine.
cometBftConsensus = "comet"
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved
)

type (
// Broadcaster defines an interface for broadcasting transactions to the consensus engine.
Broadcaster interface {
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved
// Broadcast sends a transaction to the network and returns the result.
//
// It returns a byte slice containing the formatted result that will be
// passed to the output writer, and an error if the broadcast failed.
Broadcast(ctx context.Context, txBytes []byte) ([]byte, error)
}

// factory defines a generic interface for creating a Broadcaster.
factory interface {
// Create creates a new Broadcaster instance of type T.
create(ctx context.Context, consensus, url string, opts ...Option) (Broadcaster, error)
}
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved

// Option is a function that configures a Broadcaster.
Option func(Broadcaster)
)

var _ factory = broadcasterFactory{}

// broadcasterFactory is a factory for creating Broadcaster instances.
type broadcasterFactory struct{}

// create creates a new Broadcaster based on the given consensus type.
func (f broadcasterFactory) create(_ context.Context, consensus, url string, opts ...Option) (Broadcaster, error) {
switch consensus {
case cometBftConsensus:
return NewCometBftBroadcaster(url, opts...)
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved
default:
return nil, fmt.Errorf("invalid consensus type: %s", consensus)
}
}
39 changes: 39 additions & 0 deletions client/v2/internal/broadcast/broadcaster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package broadcast

import (
"context"
"testing"

"github.com/stretchr/testify/require"
)

func Test_newBroadcaster(t *testing.T) {
tests := []struct {
name string
consensus string
opts []Option
want Broadcaster
wantErr bool
}{
{
name: "comet",
consensus: "comet",
opts: []Option{
withMode(BroadcastSync),
},
want: &CometBftBroadcaster{},
},
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := broadcasterFactory{}.create(context.Background(), tt.consensus, "localhost:26657", tt.opts...)
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
require.NotNil(t, got)
require.IsType(t, tt.want, got)
}
})
}
}
159 changes: 159 additions & 0 deletions client/v2/internal/broadcast/comet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package broadcast

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/cometbft/cometbft/mempool"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
cmttypes "github.com/cometbft/cometbft/types"

apiacbci "cosmossdk.io/api/cosmos/base/abci/v1beta1"
serverrpc "cosmossdk.io/server/v2/cometbft/client/rpc"
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved

"github.com/cosmos/cosmos-sdk/codec"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved

var _ Broadcaster = CometBftBroadcaster{}

// CometBftBroadcaster implements the Broadcaster interface for CometBFT consensus engine.
type CometBftBroadcaster struct {
rpcClient serverrpc.CometRPC
mode string
cdc codec.JSONCodec
}

func withMode(mode string) func(broadcaster Broadcaster) {
return func(b Broadcaster) {
cbc, ok := b.(*CometBftBroadcaster)
if !ok {
return
}
cbc.mode = mode
}
}
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved

func withJsonCodec(codec codec.JSONCodec) func(broadcaster Broadcaster) {
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved
return func(b Broadcaster) {
cbc, ok := b.(*CometBftBroadcaster)
if !ok {
return
}
cbc.cdc = codec
}
}

// NewCometBftBroadcaster creates a new CometBftBroadcaster.
func NewCometBftBroadcaster(rpcURL string, opts ...Option) (*CometBftBroadcaster, error) {
rpcClient, err := rpchttp.New(rpcURL)
if err != nil {
return nil, fmt.Errorf("failed to create CometBft RPC client: %w", err)
}

bc := &CometBftBroadcaster{}
for _, opt := range opts {
opt(bc)
}

bc.rpcClient = *rpcClient
return bc, nil
}

// Broadcast sends a transaction to the network and returns the result.
// returns a byte slice containing the JSON-encoded result and an error if the broadcast failed.
func (c CometBftBroadcaster) Broadcast(ctx context.Context, txBytes []byte) ([]byte, error) {
var fn func(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error)
switch c.mode {
case BroadcastSync:
fn = c.rpcClient.BroadcastTxSync
case BroadcastAsync:
fn = c.rpcClient.BroadcastTxAsync
default:
return []byte{}, fmt.Errorf("unknown broadcast mode: %s", c.mode)
}

res, err := c.broadcast(ctx, txBytes, fn)
if err != nil {
return []byte{}, err
}

return c.cdc.MarshalJSON(res)
}

// broadcast sends a transaction to the CometBFT network using the provided function.
func (c CometBftBroadcaster) broadcast(ctx context.Context, txbytes []byte,
fn func(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error),
) (*apiacbci.TxResponse, error) {
bResult, err := fn(ctx, txbytes)
if errRes := checkCometError(err, txbytes); err != nil {
return errRes, nil
}
JulianToledano marked this conversation as resolved.
Show resolved Hide resolved

return newResponseFormatBroadcastTx(bResult), err
}

JulianToledano marked this conversation as resolved.
Show resolved Hide resolved
// checkCometError checks for errors returned by the CometBFT network and returns an appropriate TxResponse.
// It extracts error information and constructs a TxResponse with the error details.
func checkCometError(err error, tx cmttypes.Tx) *apiacbci.TxResponse {
if err == nil {
return nil
}

errStr := strings.ToLower(err.Error())
txHash := fmt.Sprintf("%X", tx.Hash())

switch {
case strings.Contains(errStr, strings.ToLower(mempool.ErrTxInCache.Error())):
return &apiacbci.TxResponse{
Code: sdkerrors.ErrTxInMempoolCache.ABCICode(),
Codespace: sdkerrors.ErrTxInMempoolCache.Codespace(),
Txhash: txHash,
}

case strings.Contains(errStr, "mempool is full"):
return &apiacbci.TxResponse{
Code: sdkerrors.ErrMempoolIsFull.ABCICode(),
Codespace: sdkerrors.ErrMempoolIsFull.Codespace(),
Txhash: txHash,
}

case strings.Contains(errStr, "tx too large"):
return &apiacbci.TxResponse{
Code: sdkerrors.ErrTxTooLarge.ABCICode(),
Codespace: sdkerrors.ErrTxTooLarge.Codespace(),
Txhash: txHash,
}

default:
return nil
}
}

// newResponseFormatBroadcastTx returns a TxResponse given a ResultBroadcastTx from cometbft
func newResponseFormatBroadcastTx(res *coretypes.ResultBroadcastTx) *apiacbci.TxResponse {
if res == nil {
return nil
}

parsedLogs, _ := parseABCILogs(res.Log)

JulianToledano marked this conversation as resolved.
Show resolved Hide resolved
return &apiacbci.TxResponse{
Code: res.Code,
Codespace: res.Codespace,
Data: res.Data.String(),
RawLog: res.Log,
Logs: parsedLogs,
Txhash: res.Hash.String(),
}
}

// parseABCILogs attempts to parse a stringified ABCI tx log into a slice of
// ABCIMessageLog types. It returns an error upon JSON decoding failure.
func parseABCILogs(logs string) (res []*apiacbci.ABCIMessageLog, err error) {
err = json.Unmarshal([]byte(logs), &res)
return res, err
}
Loading
Loading