diff --git a/cmd/run/cmd.go b/cmd/run/cmd.go index 3b93e45a..6b87e04c 100644 --- a/cmd/run/cmd.go +++ b/cmd/run/cmd.go @@ -28,7 +28,7 @@ import ( var Cmd = &cobra.Command{ Use: "run", Short: "Runs the EVM Gateway Node", - Run: func(*cobra.Command, []string) { + Run: func(command *cobra.Command, _ []string) { // create multi-key account if _, exists := os.LookupEnv("MULTIKEY_MODE"); exists { bootstrap.RunCreateMultiKeyAccount() @@ -40,13 +40,15 @@ var Cmd = &cobra.Command{ os.Exit(1) } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(command.Context()) + done := make(chan struct{}) ready := make(chan struct{}) go func() { - if err := bootstrap.Run(ctx, cfg, ready); err != nil { + defer close(done) + err := bootstrap.Run(ctx, cfg, ready) + if err != nil { log.Err(err).Msg("failed to run bootstrap") cancel() - os.Exit(1) } }() @@ -55,7 +57,15 @@ var Cmd = &cobra.Command{ osSig := make(chan os.Signal, 1) signal.Notify(osSig, syscall.SIGINT, syscall.SIGTERM) - <-osSig + select { + case <-osSig: + log.Info().Msg("OS Signal to shutdown received, shutting down") + cancel() + case <-done: + log.Info().Msg("done, shutting down") + cancel() + } + log.Info().Msg("OS Signal to shutdown received, shutting down") cancel() }, diff --git a/go.mod b/go.mod index 335ec24e..5fcd558f 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/sethvargo/go-retry v0.2.3 github.com/spf13/cobra v1.8.1 github.com/stretchr/testify v1.9.0 + go.uber.org/ratelimit v0.3.1 golang.org/x/exp v0.0.0-20240119083558-1b970713d09a golang.org/x/sync v0.8.0 google.golang.org/grpc v1.63.2 @@ -35,6 +36,7 @@ require ( github.com/SaveTheRbtz/mph v0.1.1-0.20240117162131-4166ec7869bc // indirect github.com/StackExchange/wmi v1.2.1 // indirect github.com/VictoriaMetrics/fastcache v1.12.2 // indirect + github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.10.0 // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect diff --git a/go.sum b/go.sum index 6fe1a45a..98d1b96b 100644 --- a/go.sum +++ b/go.sum @@ -69,6 +69,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= +github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -762,6 +764,8 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/ go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0= +go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= diff --git a/services/ingestion/engine.go b/services/ingestion/engine.go index 47997039..cb4c1494 100644 --- a/services/ingestion/engine.go +++ b/services/ingestion/engine.go @@ -163,9 +163,17 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error { Int("cadence-event-length", events.Length()). Msg("received new cadence evm events") + batch := e.store.NewBatch() + defer func(batch *pebbleDB.Batch) { + err := batch.Close() + if err != nil { + e.log.Fatal().Err(err).Msg("failed to close batch") + } + }(batch) + // if heartbeat interval with no data still update the cadence height if events.Empty() { - if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight(), nil); err != nil { + if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight(), batch); err != nil { return fmt.Errorf( "failed to update to latest cadence height: %d, during events ingestion: %w", events.CadenceHeight(), @@ -176,14 +184,6 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error { return nil // nothing else to do this was heartbeat event with not event payloads } - batch := e.store.NewBatch() - defer func(batch *pebbleDB.Batch) { - err := batch.Close() - if err != nil { - e.log.Fatal().Err(err).Msg("failed to close batch") - } - }(batch) - // Step 1: Re-execute all transactions on the latest EVM block // Step 1.1: Notify the `BlocksProvider` of the newly received EVM block diff --git a/services/ingestion/event_subscriber.go b/services/ingestion/event_subscriber.go index 47da8972..3138269c 100644 --- a/services/ingestion/event_subscriber.go +++ b/services/ingestion/event_subscriber.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sort" "github.com/onflow/cadence/common" "github.com/onflow/flow-go/fvm/evm/events" @@ -62,7 +63,8 @@ func NewRPCEventSubscriber( // // If error is encountered during backfill the subscription will end and the response chanel will be closed. func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockEvents { - eventsChan := make(chan models.BlockEvents) + // buffered channel so that the decoding of the events can happen in parallel to other operations + eventsChan := make(chan models.BlockEvents, 1000) go func() { defer func() { @@ -190,12 +192,9 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha return eventsChan } -// backfill will use the provided height and with the client for the provided spork will start backfilling -// events. Before subscribing, it will check what is the latest block in the current spork (defined by height) -// and check for each event it receives whether we reached the end, if we reach the end it will increase -// the height by one (next height), and check if we are still in previous sporks, if so repeat everything, -// otherwise return. -func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan models.BlockEvents { +// backfill returns a channel that is filled with block events from the provided fromCadenceHeight up to the first +// height in the current spork. +func (r *RPCEventSubscriber) backfill(ctx context.Context, fromCadenceHeight uint64) <-chan models.BlockEvents { eventsChan := make(chan models.BlockEvents) go func() { @@ -204,49 +203,111 @@ func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan }() for { - // check if the current height is still in past sporks, and if not return since we are done with backfilling - if !r.client.IsPastSpork(height) { + // check if the current fromCadenceHeight is still in past sporks, and if not return since we are done with backfilling + if !r.client.IsPastSpork(fromCadenceHeight) { r.logger.Info(). - Uint64("height", height). + Uint64("height", fromCadenceHeight). Msg("completed backfilling") return } - latestHeight, err := r.client.GetLatestHeightForSpork(ctx, height) + var err error + fromCadenceHeight, err = r.backfillSporkFromHeight(ctx, fromCadenceHeight, eventsChan) if err != nil { + r.logger.Error().Err(err).Msg("error backfilling spork") eventsChan <- models.NewBlockEventsError(err) return } r.logger.Info(). - Uint64("start-height", height). - Uint64("last-spork-height", latestHeight). - Msg("backfilling spork") + Uint64("next-cadence-height", fromCadenceHeight). + Msg("reached the end of spork, checking next spork") + } + }() - for ev := range r.subscribe(ctx, height) { - eventsChan <- ev + return eventsChan +} - if ev.Err != nil { - return - } +// maxRangeForGetEvents is the maximum range of blocks that can be fetched using the GetEventsForHeightRange method. +const maxRangeForGetEvents = uint64(249) + +// / backfillSporkFromHeight will fill the eventsChan with block events from the provided fromHeight up to the first height in the spork that comes +// after the spork of the provided fromHeight. +func (r *RPCEventSubscriber) backfillSporkFromHeight(ctx context.Context, fromCadenceHeight uint64, eventsChan chan<- models.BlockEvents) (uint64, error) { + evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address) + + lastHeight, err := r.client.GetLatestHeightForSpork(ctx, fromCadenceHeight) + if err != nil { + eventsChan <- models.NewBlockEventsError(err) + return 0, err + } - r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d]...", ev.Events.CadenceHeight(), latestHeight)) + r.logger.Info(). + Uint64("start-height", fromCadenceHeight). + Uint64("last-spork-height", lastHeight). + Msg("backfilling spork") - if ev.Events != nil && ev.Events.CadenceHeight() == latestHeight { - height = ev.Events.CadenceHeight() + 1 // go to next height in the next spork + for fromCadenceHeight < lastHeight { + r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d] ...", fromCadenceHeight, lastHeight)) - r.logger.Info(). - Uint64("next-height", height). - Msg("reached the end of spork, checking next spork") + startHeight := fromCadenceHeight + endHeight := fromCadenceHeight + maxRangeForGetEvents + if endHeight > lastHeight { + endHeight = lastHeight + } - break - } + blockExecutedEvent := common.NewAddressLocation( + nil, + evmAddress, + string(events.EventTypeBlockExecuted), + ).ID() + + transactionExecutedEvent := common.NewAddressLocation( + nil, + evmAddress, + string(events.EventTypeTransactionExecuted), + ).ID() + + blocks, err := r.client.GetEventsForHeightRange(ctx, blockExecutedEvent, startHeight, endHeight) + if err != nil { + return 0, fmt.Errorf("failed to get block events: %w", err) + } + + transactions, err := r.client.GetEventsForHeightRange(ctx, transactionExecutedEvent, startHeight, endHeight) + if err != nil { + return 0, fmt.Errorf("failed to get block events: %w", err) + } + + if len(transactions) != len(blocks) { + return 0, fmt.Errorf("transactions and blocks have different length") + } + + // sort both, just in case + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].Height < blocks[j].Height + }) + sort.Slice(transactions, func(i, j int) bool { + return transactions[i].Height < transactions[j].Height + }) + + for i := range transactions { + if transactions[i].Height != blocks[i].Height { + return 0, fmt.Errorf("transactions and blocks have different height") } + + // append the transaction events to the block events + blocks[i].Events = append(blocks[i].Events, transactions[i].Events...) + + evmEvents := models.NewBlockEvents(blocks[i]) + eventsChan <- evmEvents + + // advance the height + fromCadenceHeight = evmEvents.Events.CadenceHeight() + 1 } - }() - return eventsChan + } + return fromCadenceHeight, nil } // fetchMissingData is used as a backup mechanism for fetching EVM-related diff --git a/services/ingestion/event_subscriber_test.go b/services/ingestion/event_subscriber_test.go index 04626af2..7505533a 100644 --- a/services/ingestion/event_subscriber_test.go +++ b/services/ingestion/event_subscriber_test.go @@ -205,10 +205,10 @@ func Test_SubscribingWithRetryOnError(t *testing.T) { prevHeight = eventHeight if eventHeight == cadenceHeight { - assert.Equal(t, evmBlock, ev.Events.Block()) + require.Equal(t, evmBlock, ev.Events.Block()) for i := 0; i < len(txHashes); i++ { tx := ev.Events.Transactions()[i] - assert.Equal(t, txHashes[i], tx.Hash()) + require.Equal(t, txHashes[i], tx.Hash()) } } } @@ -417,6 +417,8 @@ func setupClientForBackupEventFetching( cadenceHeight, ).Return([]flow.BlockEvents{evmTxEvents}, nil).Once() + client.GetEventsForHeightRangeFunc = nil + client.SubscribeEventsByBlockHeightFunc = func( ctx context.Context, startHeight uint64, diff --git a/services/requester/cross-spork_client.go b/services/requester/cross-spork_client.go index a9df208e..4025aa3c 100644 --- a/services/requester/cross-spork_client.go +++ b/services/requester/cross-spork_client.go @@ -5,19 +5,20 @@ import ( "fmt" "github.com/onflow/cadence" + errs "github.com/onflow/flow-evm-gateway/models/errors" "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go-sdk/access" flowGo "github.com/onflow/flow-go/model/flow" "github.com/rs/zerolog" + "go.uber.org/ratelimit" "golang.org/x/exp/slices" - - errs "github.com/onflow/flow-evm-gateway/models/errors" ) type sporkClient struct { - firstHeight uint64 - lastHeight uint64 - client access.Client + firstHeight uint64 + lastHeight uint64 + client access.Client + getEventsForHeightRangeLimiter ratelimit.Limiter } // contains checks if the provided height is withing the range of available heights @@ -25,6 +26,14 @@ func (s *sporkClient) contains(height uint64) bool { return height >= s.firstHeight && height <= s.lastHeight } +func (s *sporkClient) GetEventsForHeightRange( + ctx context.Context, eventType string, startHeight uint64, endHeight uint64, +) ([]flow.BlockEvents, error) { + s.getEventsForHeightRangeLimiter.Take() + + return s.client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight) +} + type sporkClients []*sporkClient // addSpork will add a new spork host defined by the first and last height boundary in that spork. @@ -48,6 +57,8 @@ func (s *sporkClients) add(logger zerolog.Logger, client access.Client) error { firstHeight: info.NodeRootBlockHeight, lastHeight: header.Height, client: client, + // TODO (JanezP): Make this configurable + getEventsForHeightRangeLimiter: ratelimit.New(100, ratelimit.WithoutSlack), }) // make sure clients are always sorted @@ -214,3 +225,21 @@ func (c *CrossSporkClient) SubscribeEventsByBlockHeight( } return client.SubscribeEventsByBlockHeight(ctx, startHeight, filter, opts...) } + +func (c *CrossSporkClient) GetEventsForHeightRange( + ctx context.Context, eventType string, startHeight uint64, endHeight uint64, +) ([]flow.BlockEvents, error) { + client, err := c.getClientForHeight(startHeight) + if err != nil { + return nil, err + } + endClient, err := c.getClientForHeight(endHeight) + if err != nil { + return nil, err + } + // there is one client reference per spork, so we can compare the clients + if endClient != client { + return nil, fmt.Errorf("invalid height range, end height %d is not in the same spork as start height %d", endHeight, startHeight) + } + return client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight) +} diff --git a/services/testutils/mock_client.go b/services/testutils/mock_client.go index 3e4c7faf..2f65021a 100644 --- a/services/testutils/mock_client.go +++ b/services/testutils/mock_client.go @@ -15,6 +15,9 @@ type MockClient struct { GetBlockHeaderByHeightFunc func(context.Context, uint64) (*flow.BlockHeader, error) SubscribeEventsByBlockHeightFunc func(context.Context, uint64, flow.EventFilter, ...access.SubscribeOption) (<-chan flow.BlockEvents, <-chan error, error) GetNodeVersionInfoFunc func(ctx context.Context) (*flow.NodeVersionInfo, error) + GetEventsForHeightRangeFunc func( + ctx context.Context, eventType string, startHeight uint64, endHeight uint64, + ) ([]flow.BlockEvents, error) } func (c *MockClient) GetBlockHeaderByHeight(ctx context.Context, height uint64) (*flow.BlockHeader, error) { @@ -38,6 +41,15 @@ func (c *MockClient) SubscribeEventsByBlockHeight( return c.SubscribeEventsByBlockHeightFunc(ctx, startHeight, filter, opts...) } +func (c *MockClient) GetEventsForHeightRange( + ctx context.Context, eventType string, startHeight uint64, endHeight uint64, +) ([]flow.BlockEvents, error) { + if c.GetEventsForHeightRangeFunc != nil { + return c.GetEventsForHeightRangeFunc(ctx, eventType, startHeight, endHeight) + } + return c.Client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight) +} + func SetupClientForRange(startHeight uint64, endHeight uint64) *MockClient { client, events := SetupClient(startHeight, endHeight) go func() { @@ -85,5 +97,24 @@ func SetupClient(startHeight uint64, endHeight uint64) (*MockClient, chan flow.B ) (<-chan flow.BlockEvents, <-chan error, error) { return events, make(chan error), nil }, + GetEventsForHeightRangeFunc: func( + ctx context.Context, eventType string, sh uint64, eh uint64, + ) ([]flow.BlockEvents, error) { + if sh < startHeight || sh > endHeight { + return nil, storage.ErrNotFound + } + if eh < startHeight || eh > endHeight { + return nil, storage.ErrNotFound + } + + evts := make([]flow.BlockEvents, 0, eh-sh+1) + for i := uint64(0); i <= eh-sh; i++ { + evts = append(evts, flow.BlockEvents{ + Height: sh + i, + }) + } + + return evts, nil + }, }, events } diff --git a/tests/go.mod b/tests/go.mod index c61c7ed2..c42584d0 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -28,6 +28,7 @@ require ( github.com/StackExchange/wmi v1.2.1 // indirect github.com/VictoriaMetrics/fastcache v1.12.2 // indirect github.com/allegro/bigcache v1.2.1 // indirect + github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.10.0 // indirect github.com/btcsuite/btcd v0.21.0-beta // indirect @@ -217,6 +218,7 @@ require ( go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect + go.uber.org/ratelimit v0.3.1 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/crypto v0.26.0 // indirect golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect diff --git a/tests/go.sum b/tests/go.sum index 1311f1f0..997f8b26 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -1124,6 +1124,8 @@ go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+ go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0= +go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=