Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(indexer/postgres)!: add basic support for header, txs and events #22695

Merged
merged 14 commits into from
Dec 9, 2024
4 changes: 4 additions & 0 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,10 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) {
app.logger.Error("Commit listening hook failed", "height", blockHeight, "err", err)
if app.streamingManager.StopNodeOnErr {
err = fmt.Errorf("Commit listening hook failed: %w", err)
if blockHeight == 1 {
// can't rollback to height 0, so just return the error
return nil, fmt.Errorf("failed to commit block 1, can't automatically rollback: %w", err)
}
tac0turtle marked this conversation as resolved.
Show resolved Hide resolved
rollbackErr := app.cms.RollbackToVersion(blockHeight - 1)
if rollbackErr != nil {
return nil, errors.Join(err, rollbackErr)
Expand Down
25 changes: 17 additions & 8 deletions baseapp/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package baseapp

import (
"context"
"encoding/json"
"fmt"
"sort"
"strconv"
Expand Down Expand Up @@ -144,9 +145,10 @@ func exposeStoreKeysSorted(keysStr []string, keys map[string]*storetypes.KVStore
return exposeStoreKeys
}

func eventToAppDataEvent(event abci.Event) (appdata.Event, error) {
func eventToAppDataEvent(event abci.Event, height int64) (appdata.Event, error) {
appdataEvent := appdata.Event{
Type: event.Type,
BlockNumber: uint64(height),
Type: event.Type,
Attributes: func() ([]appdata.EventAttribute, error) {
attrs := make([]appdata.EventAttribute, len(event.Attributes))
for j, attr := range event.Attributes {
Expand Down Expand Up @@ -208,20 +210,27 @@ func NewListenerWrapper(listener appdata.Listener) listenerWrapper {

func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.FinalizeBlockRequest, res abci.FinalizeBlockResponse) error {
if p.listener.StartBlock != nil {
// this is to avoid putting all txs along with the block data
reqWithoutTxs := req
reqWithoutTxs.Txs = nil

if err := p.listener.StartBlock(appdata.StartBlockData{
Height: uint64(req.Height),
HeaderBytes: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
HeaderJSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
HeaderJSON: func() (json.RawMessage, error) {
return json.Marshal(reqWithoutTxs)
},
}); err != nil {
return err
}
}
if p.listener.OnTx != nil {
for i, tx := range req.Txs {
if err := p.listener.OnTx(appdata.TxData{
TxIndex: int32(i),
Bytes: func() ([]byte, error) { return tx, nil },
JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
BlockNumber: uint64(req.Height),
TxIndex: int32(i),
Bytes: func() ([]byte, error) { return tx, nil },
JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
}); err != nil {
return err
}
Expand All @@ -231,14 +240,14 @@ func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.Finaliz
events := make([]appdata.Event, len(res.Events))
var err error
for i, event := range res.Events {
events[i], err = eventToAppDataEvent(event)
events[i], err = eventToAppDataEvent(event, req.Height)
if err != nil {
return err
}
}
for _, txResult := range res.TxResults {
for _, event := range txResult.Events {
appdataEvent, err := eventToAppDataEvent(event)
appdataEvent, err := eventToAppDataEvent(event, req.Height)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ require (
// TODO remove after all modules have their own go.mods
replace (
cosmossdk.io/api => ./api
cosmossdk.io/schema => ./schema
cosmossdk.io/store => ./store
cosmossdk.io/x/bank => ./x/bank
cosmossdk.io/x/staking => ./x/staking
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ cosmossdk.io/log v1.5.0 h1:dVdzPJW9kMrnAYyMf1duqacoidB9uZIl+7c6z0mnq0g=
cosmossdk.io/log v1.5.0/go.mod h1:Tr46PUJjiUthlwQ+hxYtUtPn4D/oCZXAkYevBeh5+FI=
cosmossdk.io/math v1.4.0 h1:XbgExXFnXmF/CccPPEto40gOO7FpWu9yWNAZPN3nkNQ=
cosmossdk.io/math v1.4.0/go.mod h1:O5PkD4apz2jZs4zqFdTr16e1dcaQCc5z6lkEnrrppuk=
cosmossdk.io/schema v0.3.1-0.20241128094659-bd76b47e1d8b h1:svpFdulZRrYz+RTHu2u9CeKkMKrIHx5354vjiHerovo=
cosmossdk.io/schema v0.3.1-0.20241128094659-bd76b47e1d8b/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs=
Expand Down
15 changes: 9 additions & 6 deletions indexer/postgres/base_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ CREATE TABLE IF NOT EXISTS tx
id BIGSERIAL PRIMARY KEY,
block_number BIGINT NOT NULL REFERENCES block (number),
index_in_block BIGINT NOT NULL,
data JSONB NOT NULL
data JSONB NULL,
bytes BYTEA NULL
);

CREATE TABLE IF NOT EXISTS event
(
id BIGSERIAL PRIMARY KEY,
block_number BIGINT NOT NULL REFERENCES block (number),
tx_id BIGINT NULL REFERENCES tx (id),
msg_index BIGINT NULL,
event_index BIGINT NULL,
type TEXT NOT NULL,
data JSONB NOT NULL
block_stage INTEGER NOT NULL,
tx_index BIGINT NOT NULL,
msg_index BIGINT NOT NULL,
event_index BIGINT NOT NULL,
type TEXT NULL,
data JSONB NULL,
attributes JSONB NULL
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should store both data and attributes. I think if an event has Data, we should use that. If it doesn't have data and just has Attributes we should store that in data as JSON

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So basically I think we just need one data column, otherwise it's a lot of duplicated data in most cases

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make sure it's documented on the Event struct that Data and Attributes should represent the same thing and that Data is preferred by indexers if it is present.

);
`
65 changes: 65 additions & 0 deletions indexer/postgres/listener.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package postgres

import (
"encoding/json"
"fmt"

"cosmossdk.io/schema/appdata"
Expand Down Expand Up @@ -81,5 +82,69 @@ func (i *indexerImpl) listener() appdata.Listener {
i.tx, err = i.db.BeginTx(i.ctx, nil)
return nil, err
},
OnTx: func(td appdata.TxData) error {
var bz []byte
if td.Bytes != nil {
var err error
bz, err = td.Bytes()
if err != nil {
return err
}
}

var jsonData json.RawMessage
if td.JSON != nil {
var err error
jsonData, err = td.JSON()
if err != nil {
return err
}
}

_, err := i.tx.Exec("INSERT INTO tx (block_number, index_in_block, data, bytes) VALUES ($1, $2, $3, $4)",
td.BlockNumber, td.TxIndex, jsonData, bz)

return err
},
OnEvent: func(data appdata.EventData) error {
for _, e := range data.Events {
var (
jsonData json.RawMessage
jsonAttributes json.RawMessage
)

if e.Data != nil {
var err error
jsonData, err = e.Data()
if err != nil {
return fmt.Errorf("failed to get event data: %w", err)
}
}

if e.Attributes != nil {
attrs, err := e.Attributes()
if err != nil {
return fmt.Errorf("failed to get event attributes: %w", err)
}

attrsMap := map[string]interface{}{}
for _, attr := range attrs {
attrsMap[attr.Key] = attr.Value
}

jsonAttributes, err = json.Marshal(attrsMap)
if err != nil {
return fmt.Errorf("failed to marshal event attributes: %w", err)
}
}

_, err := i.tx.Exec("INSERT INTO event (block_number, block_stage, tx_index, msg_index, event_index, type, data, attributes) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
e.BlockNumber, e.BlockStage, e.TxIndex, e.MsgIndex, e.EventIndex, e.Type, jsonData, jsonAttributes)
if err != nil {
return fmt.Errorf("failed to index event: %w", err)
}
}
return nil
},
facundomedica marked this conversation as resolved.
Show resolved Hide resolved
}
}
6 changes: 6 additions & 0 deletions schema/appdata/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type StartBlockData struct {

// TxData represents the raw transaction data that is passed to a listener.
type TxData struct {
// BlockNumber is the block number to which this event is associated.
BlockNumber uint64

// TxIndex is the index of the transaction in the block.
TxIndex int32

Expand All @@ -53,6 +56,9 @@ type Event struct {
// If the block stage is unknown, it should be set to UnknownBlockStage.
BlockStage BlockStage

// BlockNumber is the block number to which this event is associated.
BlockNumber uint64

// TxIndex is the 1-based index of the transaction in the block to which this event is associated.
// If TxIndex is zero, it means that we do not know the transaction index.
// Otherwise, the index should start with 1.
Expand Down
2 changes: 2 additions & 0 deletions simapp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ replace (
cosmossdk.io/api => ../api
cosmossdk.io/client/v2 => ../client/v2
cosmossdk.io/collections => ../collections
cosmossdk.io/schema => ../schema
cosmossdk.io/indexer/postgres => ../indexer/postgres
cosmossdk.io/store => ../store
cosmossdk.io/tools/confix => ../tools/confix
cosmossdk.io/x/accounts => ../x/accounts
Expand Down
Loading