Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingestion Performance improvements #653

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions cmd/run/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
var Cmd = &cobra.Command{
Use: "run",
Short: "Runs the EVM Gateway Node",
Run: func(*cobra.Command, []string) {
Run: func(command *cobra.Command, _ []string) {
// create multi-key account
if _, exists := os.LookupEnv("MULTIKEY_MODE"); exists {
bootstrap.RunCreateMultiKeyAccount()
Expand All @@ -40,13 +40,15 @@ var Cmd = &cobra.Command{
os.Exit(1)
}

ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(command.Context())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this files are not needed for performance, but they made it slightly easier to test things, so I left them in.

done := make(chan struct{})
ready := make(chan struct{})
go func() {
if err := bootstrap.Run(ctx, cfg, ready); err != nil {
defer close(done)
err := bootstrap.Run(ctx, cfg, ready)
if err != nil {
log.Err(err).Msg("failed to run bootstrap")
cancel()
os.Exit(1)
}
}()

Expand All @@ -55,7 +57,15 @@ var Cmd = &cobra.Command{
osSig := make(chan os.Signal, 1)
signal.Notify(osSig, syscall.SIGINT, syscall.SIGTERM)

<-osSig
select {
case <-osSig:
log.Info().Msg("OS Signal to shutdown received, shutting down")
cancel()
case <-done:
log.Info().Msg("done, shutting down")
cancel()
}

log.Info().Msg("OS Signal to shutdown received, shutting down")
cancel()
},
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/sethvargo/go-retry v0.2.3
github.com/spf13/cobra v1.8.1
github.com/stretchr/testify v1.9.0
go.uber.org/ratelimit v0.3.1
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
golang.org/x/sync v0.8.0
google.golang.org/grpc v1.63.2
Expand All @@ -35,6 +36,7 @@ require (
github.com/SaveTheRbtz/mph v0.1.1-0.20240117162131-4166ec7869bc // indirect
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/VictoriaMetrics/fastcache v1.12.2 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down Expand Up @@ -762,6 +764,8 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0=
go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
Expand Down
18 changes: 9 additions & 9 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,17 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
Int("cadence-event-length", events.Length()).
Msg("received new cadence evm events")

batch := e.store.NewBatch()
defer func(batch *pebbleDB.Batch) {
err := batch.Close()
if err != nil {
e.log.Fatal().Err(err).Msg("failed to close batch")
}
}(batch)

// if heartbeat interval with no data still update the cadence height
if events.Empty() {
if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight(), nil); err != nil {
if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight(), batch); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a bug

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 👌

return fmt.Errorf(
"failed to update to latest cadence height: %d, during events ingestion: %w",
events.CadenceHeight(),
Expand All @@ -176,14 +184,6 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
return nil // nothing else to do this was heartbeat event with not event payloads
}

batch := e.store.NewBatch()
defer func(batch *pebbleDB.Batch) {
err := batch.Close()
if err != nil {
e.log.Fatal().Err(err).Msg("failed to close batch")
}
}(batch)

// Step 1: Re-execute all transactions on the latest EVM block

// Step 1.1: Notify the `BlocksProvider` of the newly received EVM block
Expand Down
119 changes: 90 additions & 29 deletions services/ingestion/event_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sort"

"github.com/onflow/cadence/common"
"github.com/onflow/flow-go/fvm/evm/events"
Expand Down Expand Up @@ -62,7 +63,8 @@ func NewRPCEventSubscriber(
//
// If error is encountered during backfill the subscription will end and the response chanel will be closed.
func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)
// buffered channel so that the decoding of the events can happen in parallel to other operations
eventsChan := make(chan models.BlockEvents, 1000)

go func() {
defer func() {
Expand Down Expand Up @@ -190,12 +192,9 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
return eventsChan
}

// backfill will use the provided height and with the client for the provided spork will start backfilling
// events. Before subscribing, it will check what is the latest block in the current spork (defined by height)
// and check for each event it receives whether we reached the end, if we reach the end it will increase
// the height by one (next height), and check if we are still in previous sporks, if so repeat everything,
// otherwise return.
func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan models.BlockEvents {
// backfill returns a channel that is filled with block events from the provided fromCadenceHeight up to the first
// height in the current spork.
func (r *RPCEventSubscriber) backfill(ctx context.Context, fromCadenceHeight uint64) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)

go func() {
Expand All @@ -204,49 +203,111 @@ func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan
}()

for {
// check if the current height is still in past sporks, and if not return since we are done with backfilling
if !r.client.IsPastSpork(height) {
// check if the current fromCadenceHeight is still in past sporks, and if not return since we are done with backfilling
if !r.client.IsPastSpork(fromCadenceHeight) {
r.logger.Info().
Uint64("height", height).
Uint64("height", fromCadenceHeight).
Msg("completed backfilling")

return
}

latestHeight, err := r.client.GetLatestHeightForSpork(ctx, height)
var err error
fromCadenceHeight, err = r.backfillSporkFromHeight(ctx, fromCadenceHeight, eventsChan)
if err != nil {
r.logger.Error().Err(err).Msg("error backfilling spork")
eventsChan <- models.NewBlockEventsError(err)
return
}

r.logger.Info().
Uint64("start-height", height).
Uint64("last-spork-height", latestHeight).
Msg("backfilling spork")
Uint64("next-cadence-height", fromCadenceHeight).
Msg("reached the end of spork, checking next spork")
}
}()

for ev := range r.subscribe(ctx, height) {
eventsChan <- ev
return eventsChan
}

if ev.Err != nil {
return
}
// maxRangeForGetEvents is the maximum range of blocks that can be fetched using the GetEventsForHeightRange method.
const maxRangeForGetEvents = uint64(249)

// / backfillSporkFromHeight will fill the eventsChan with block events from the provided fromHeight up to the first height in the spork that comes
// after the spork of the provided fromHeight.
func (r *RPCEventSubscriber) backfillSporkFromHeight(ctx context.Context, fromCadenceHeight uint64, eventsChan chan<- models.BlockEvents) (uint64, error) {
evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address)

lastHeight, err := r.client.GetLatestHeightForSpork(ctx, fromCadenceHeight)
if err != nil {
eventsChan <- models.NewBlockEventsError(err)
return 0, err
}

r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d]...", ev.Events.CadenceHeight(), latestHeight))
r.logger.Info().
Uint64("start-height", fromCadenceHeight).
Uint64("last-spork-height", lastHeight).
Msg("backfilling spork")

if ev.Events != nil && ev.Events.CadenceHeight() == latestHeight {
height = ev.Events.CadenceHeight() + 1 // go to next height in the next spork
for fromCadenceHeight < lastHeight {
r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d] ...", fromCadenceHeight, lastHeight))

r.logger.Info().
Uint64("next-height", height).
Msg("reached the end of spork, checking next spork")
startHeight := fromCadenceHeight
endHeight := fromCadenceHeight + maxRangeForGetEvents
if endHeight > lastHeight {
endHeight = lastHeight
}

break
}
blockExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeBlockExecuted),
).ID()

transactionExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeTransactionExecuted),
).ID()

blocks, err := r.client.GetEventsForHeightRange(ctx, blockExecutedEvent, startHeight, endHeight)
if err != nil {
return 0, fmt.Errorf("failed to get block events: %w", err)
}

transactions, err := r.client.GetEventsForHeightRange(ctx, transactionExecutedEvent, startHeight, endHeight)
if err != nil {
return 0, fmt.Errorf("failed to get block events: %w", err)
}

if len(transactions) != len(blocks) {
return 0, fmt.Errorf("transactions and blocks have different length")
}

// sort both, just in case
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Height < blocks[j].Height
})
sort.Slice(transactions, func(i, j int) bool {
return transactions[i].Height < transactions[j].Height
})

for i := range transactions {
if transactions[i].Height != blocks[i].Height {
return 0, fmt.Errorf("transactions and blocks have different height")
}

// append the transaction events to the block events
blocks[i].Events = append(blocks[i].Events, transactions[i].Events...)

evmEvents := models.NewBlockEvents(blocks[i])
eventsChan <- evmEvents

// advance the height
fromCadenceHeight = evmEvents.Events.CadenceHeight() + 1
}
}()

return eventsChan
}
return fromCadenceHeight, nil
}

// fetchMissingData is used as a backup mechanism for fetching EVM-related
Expand Down
6 changes: 4 additions & 2 deletions services/ingestion/event_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,10 @@ func Test_SubscribingWithRetryOnError(t *testing.T) {
prevHeight = eventHeight

if eventHeight == cadenceHeight {
assert.Equal(t, evmBlock, ev.Events.Block())
require.Equal(t, evmBlock, ev.Events.Block())
for i := 0; i < len(txHashes); i++ {
tx := ev.Events.Transactions()[i]
assert.Equal(t, txHashes[i], tx.Hash())
require.Equal(t, txHashes[i], tx.Hash())
}
}
}
Expand Down Expand Up @@ -417,6 +417,8 @@ func setupClientForBackupEventFetching(
cadenceHeight,
).Return([]flow.BlockEvents{evmTxEvents}, nil).Once()

client.GetEventsForHeightRangeFunc = nil

client.SubscribeEventsByBlockHeightFunc = func(
ctx context.Context,
startHeight uint64,
Expand Down
39 changes: 34 additions & 5 deletions services/requester/cross-spork_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,35 @@ import (
"fmt"

"github.com/onflow/cadence"
errs "github.com/onflow/flow-evm-gateway/models/errors"
"github.com/onflow/flow-go-sdk"
"github.com/onflow/flow-go-sdk/access"
flowGo "github.com/onflow/flow-go/model/flow"
"github.com/rs/zerolog"
"go.uber.org/ratelimit"
"golang.org/x/exp/slices"

errs "github.com/onflow/flow-evm-gateway/models/errors"
)

type sporkClient struct {
firstHeight uint64
lastHeight uint64
client access.Client
firstHeight uint64
lastHeight uint64
client access.Client
getEventsForHeightRangeLimiter ratelimit.Limiter
}

// contains checks if the provided height is withing the range of available heights
func (s *sporkClient) contains(height uint64) bool {
return height >= s.firstHeight && height <= s.lastHeight
}

func (s *sporkClient) GetEventsForHeightRange(
ctx context.Context, eventType string, startHeight uint64, endHeight uint64,
) ([]flow.BlockEvents, error) {
s.getEventsForHeightRangeLimiter.Take()

return s.client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight)
}

type sporkClients []*sporkClient

// addSpork will add a new spork host defined by the first and last height boundary in that spork.
Expand All @@ -48,6 +57,8 @@ func (s *sporkClients) add(logger zerolog.Logger, client access.Client) error {
firstHeight: info.NodeRootBlockHeight,
lastHeight: header.Height,
client: client,
// TODO (JanezP): Make this configurable
getEventsForHeightRangeLimiter: ratelimit.New(100, ratelimit.WithoutSlack),
})

// make sure clients are always sorted
Expand Down Expand Up @@ -214,3 +225,21 @@ func (c *CrossSporkClient) SubscribeEventsByBlockHeight(
}
return client.SubscribeEventsByBlockHeight(ctx, startHeight, filter, opts...)
}

func (c *CrossSporkClient) GetEventsForHeightRange(
ctx context.Context, eventType string, startHeight uint64, endHeight uint64,
) ([]flow.BlockEvents, error) {
client, err := c.getClientForHeight(startHeight)
if err != nil {
return nil, err
}
endClient, err := c.getClientForHeight(endHeight)
if err != nil {
return nil, err
}
// there is one client reference per spork, so we can compare the clients
if endClient != client {
return nil, fmt.Errorf("invalid height range, end height %d is not in the same spork as start height %d", endHeight, startHeight)
}
return client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight)
}
Loading
Loading