From cece1e8c606e608ba5badfd03320a2c8c4d2fb16 Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Fri, 8 Nov 2024 17:37:06 +0100 Subject: [PATCH 1/6] batch get events when backfilling --- cmd/run/cmd.go | 9 +++ services/ingestion/event_subscriber.go | 83 ++++++++++++++++++++---- services/requester/cross-spork_client.go | 11 ++++ 3 files changed, 92 insertions(+), 11 deletions(-) diff --git a/cmd/run/cmd.go b/cmd/run/cmd.go index 3b93e45a..1f1ec07f 100644 --- a/cmd/run/cmd.go +++ b/cmd/run/cmd.go @@ -7,6 +7,7 @@ import ( "math/big" "os" "os/signal" + "runtime/pprof" "strings" "syscall" "time" @@ -29,6 +30,14 @@ var Cmd = &cobra.Command{ Use: "run", Short: "Runs the EVM Gateway Node", Run: func(*cobra.Command, []string) { + + f, err := os.Create("cpu.pprof") + if err != nil { + log.Fatal().Err(err).Msg("could not create cpu profile") + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + // create multi-key account if _, exists := os.LookupEnv("MULTIKEY_MODE"); exists { bootstrap.RunCreateMultiKeyAccount() diff --git a/services/ingestion/event_subscriber.go b/services/ingestion/event_subscriber.go index 47da8972..a04d2e00 100644 --- a/services/ingestion/event_subscriber.go +++ b/services/ingestion/event_subscriber.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "sort" + "time" "github.com/onflow/cadence/common" "github.com/onflow/flow-go/fvm/evm/events" @@ -62,7 +64,8 @@ func NewRPCEventSubscriber( // // If error is encountered during backfill the subscription will end and the response chanel will be closed. func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockEvents { - eventsChan := make(chan models.BlockEvents) + // buffered channel so that the decoding of the events can happen in parallel to other operations + eventsChan := make(chan models.BlockEvents, 1000) go func() { defer func() { @@ -196,6 +199,7 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha // the height by one (next height), and check if we are still in previous sporks, if so repeat everything, // otherwise return. func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan models.BlockEvents { + // TODO(JanezP): if we are backfilling, its more efficient to request events in a batch eventsChan := make(chan models.BlockEvents) go func() { @@ -224,25 +228,82 @@ func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan Uint64("last-spork-height", latestHeight). Msg("backfilling spork") - for ev := range r.subscribe(ctx, height) { - eventsChan <- ev + ticker := time.NewTicker(time.Millisecond * 10) - if ev.Err != nil { + maxRange := uint64(249) + for height < latestHeight { + + // TODO: do rate limiting better + <-ticker.C + + startHeight := height + r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d]...", startHeight, latestHeight)) + endHeight := height + maxRange + if endHeight > latestHeight { + endHeight = latestHeight + } + + evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address) + blockExecutedEvent := common.NewAddressLocation( + nil, + evmAddress, + string(events.EventTypeBlockExecuted), + ).ID() + + transactionExecutedEvent := common.NewAddressLocation( + nil, + evmAddress, + string(events.EventTypeTransactionExecuted), + ).ID() + + // + blocks, err := r.client.GetEventsForHeightRange(ctx, blockExecutedEvent, startHeight, endHeight) + if err != nil { + r.logger.Error().Err(err).Msg("failed to get block events") + eventsChan <- models.NewBlockEventsError(err) + return + } + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].Height < blocks[j].Height + }) + + transactions, err := r.client.GetEventsForHeightRange(ctx, transactionExecutedEvent, startHeight, endHeight) + if err != nil { + r.logger.Error().Err(err).Msg("failed to get block events") + eventsChan <- models.NewBlockEventsError(err) return } - r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d]...", ev.Events.CadenceHeight(), latestHeight)) + sort.Slice(transactions, func(i, j int) bool { + return transactions[i].Height < transactions[j].Height + }) + + if len(transactions) != len(blocks) { + r.logger.Error().Msg("transactions and blocks have different length") + eventsChan <- models.NewBlockEventsError(err) + return + } - if ev.Events != nil && ev.Events.CadenceHeight() == latestHeight { - height = ev.Events.CadenceHeight() + 1 // go to next height in the next spork + for i := range transactions { + if transactions[i].Height != blocks[i].Height { + r.logger.Error().Msg("transactions and blocks have different height") + eventsChan <- models.NewBlockEventsError(err) + return + } + // append the transaction events to the block events + blocks[i].Events = append(blocks[i].Events, transactions[i].Events...) - r.logger.Info(). - Uint64("next-height", height). - Msg("reached the end of spork, checking next spork") + evmEvents := models.NewBlockEvents(blocks[i]) + height = evmEvents.Events.CadenceHeight() + 1 - break } + } + ticker.Stop() + + r.logger.Info(). + Uint64("next-height", height). + Msg("reached the end of spork, checking next spork") } }() diff --git a/services/requester/cross-spork_client.go b/services/requester/cross-spork_client.go index a9df208e..91cc7634 100644 --- a/services/requester/cross-spork_client.go +++ b/services/requester/cross-spork_client.go @@ -214,3 +214,14 @@ func (c *CrossSporkClient) SubscribeEventsByBlockHeight( } return client.SubscribeEventsByBlockHeight(ctx, startHeight, filter, opts...) } + +func (c *CrossSporkClient) GetEventsForHeightRange( + ctx context.Context, eventType string, startHeight uint64, endHeight uint64, +) ([]flow.BlockEvents, error) { + // TODO: also make sure the endHeight is not too high + client, err := c.getClientForHeight(startHeight) + if err != nil { + return nil, err + } + return client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight) +} From cb45e91ba03b7dc2c70c5a63b841575d93d2fced Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Wed, 13 Nov 2024 15:37:11 +0100 Subject: [PATCH 2/6] cleanup --- cmd/run/cmd.go | 29 ++--- go.mod | 2 + go.sum | 4 + services/ingestion/engine.go | 18 +-- services/ingestion/event_subscriber.go | 150 +++++++++++------------ services/requester/cross-spork_client.go | 30 ++++- services/testutils/mock_client.go | 14 +++ 7 files changed, 143 insertions(+), 104 deletions(-) diff --git a/cmd/run/cmd.go b/cmd/run/cmd.go index 1f1ec07f..6b87e04c 100644 --- a/cmd/run/cmd.go +++ b/cmd/run/cmd.go @@ -7,7 +7,6 @@ import ( "math/big" "os" "os/signal" - "runtime/pprof" "strings" "syscall" "time" @@ -29,15 +28,7 @@ import ( var Cmd = &cobra.Command{ Use: "run", Short: "Runs the EVM Gateway Node", - Run: func(*cobra.Command, []string) { - - f, err := os.Create("cpu.pprof") - if err != nil { - log.Fatal().Err(err).Msg("could not create cpu profile") - } - pprof.StartCPUProfile(f) - defer pprof.StopCPUProfile() - + Run: func(command *cobra.Command, _ []string) { // create multi-key account if _, exists := os.LookupEnv("MULTIKEY_MODE"); exists { bootstrap.RunCreateMultiKeyAccount() @@ -49,13 +40,15 @@ var Cmd = &cobra.Command{ os.Exit(1) } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(command.Context()) + done := make(chan struct{}) ready := make(chan struct{}) go func() { - if err := bootstrap.Run(ctx, cfg, ready); err != nil { + defer close(done) + err := bootstrap.Run(ctx, cfg, ready) + if err != nil { log.Err(err).Msg("failed to run bootstrap") cancel() - os.Exit(1) } }() @@ -64,7 +57,15 @@ var Cmd = &cobra.Command{ osSig := make(chan os.Signal, 1) signal.Notify(osSig, syscall.SIGINT, syscall.SIGTERM) - <-osSig + select { + case <-osSig: + log.Info().Msg("OS Signal to shutdown received, shutting down") + cancel() + case <-done: + log.Info().Msg("done, shutting down") + cancel() + } + log.Info().Msg("OS Signal to shutdown received, shutting down") cancel() }, diff --git a/go.mod b/go.mod index 335ec24e..5fcd558f 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/sethvargo/go-retry v0.2.3 github.com/spf13/cobra v1.8.1 github.com/stretchr/testify v1.9.0 + go.uber.org/ratelimit v0.3.1 golang.org/x/exp v0.0.0-20240119083558-1b970713d09a golang.org/x/sync v0.8.0 google.golang.org/grpc v1.63.2 @@ -35,6 +36,7 @@ require ( github.com/SaveTheRbtz/mph v0.1.1-0.20240117162131-4166ec7869bc // indirect github.com/StackExchange/wmi v1.2.1 // indirect github.com/VictoriaMetrics/fastcache v1.12.2 // indirect + github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.10.0 // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect diff --git a/go.sum b/go.sum index 6fe1a45a..98d1b96b 100644 --- a/go.sum +++ b/go.sum @@ -69,6 +69,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= +github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -762,6 +764,8 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/ go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0= +go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= diff --git a/services/ingestion/engine.go b/services/ingestion/engine.go index 47997039..cb4c1494 100644 --- a/services/ingestion/engine.go +++ b/services/ingestion/engine.go @@ -163,9 +163,17 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error { Int("cadence-event-length", events.Length()). Msg("received new cadence evm events") + batch := e.store.NewBatch() + defer func(batch *pebbleDB.Batch) { + err := batch.Close() + if err != nil { + e.log.Fatal().Err(err).Msg("failed to close batch") + } + }(batch) + // if heartbeat interval with no data still update the cadence height if events.Empty() { - if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight(), nil); err != nil { + if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight(), batch); err != nil { return fmt.Errorf( "failed to update to latest cadence height: %d, during events ingestion: %w", events.CadenceHeight(), @@ -176,14 +184,6 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error { return nil // nothing else to do this was heartbeat event with not event payloads } - batch := e.store.NewBatch() - defer func(batch *pebbleDB.Batch) { - err := batch.Close() - if err != nil { - e.log.Fatal().Err(err).Msg("failed to close batch") - } - }(batch) - // Step 1: Re-execute all transactions on the latest EVM block // Step 1.1: Notify the `BlocksProvider` of the newly received EVM block diff --git a/services/ingestion/event_subscriber.go b/services/ingestion/event_subscriber.go index a04d2e00..ad919af8 100644 --- a/services/ingestion/event_subscriber.go +++ b/services/ingestion/event_subscriber.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "sort" - "time" "github.com/onflow/cadence/common" "github.com/onflow/flow-go/fvm/evm/events" @@ -198,8 +197,7 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha // and check for each event it receives whether we reached the end, if we reach the end it will increase // the height by one (next height), and check if we are still in previous sporks, if so repeat everything, // otherwise return. -func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan models.BlockEvents { - // TODO(JanezP): if we are backfilling, its more efficient to request events in a batch +func (r *RPCEventSubscriber) backfill(ctx context.Context, currentHeight uint64) <-chan models.BlockEvents { eventsChan := make(chan models.BlockEvents) go func() { @@ -208,106 +206,108 @@ func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan }() for { - // check if the current height is still in past sporks, and if not return since we are done with backfilling - if !r.client.IsPastSpork(height) { + // check if the current currentHeight is still in past sporks, and if not return since we are done with backfilling + if !r.client.IsPastSpork(currentHeight) { r.logger.Info(). - Uint64("height", height). + Uint64("height", currentHeight). Msg("completed backfilling") return } - latestHeight, err := r.client.GetLatestHeightForSpork(ctx, height) + currentHeight, err := r.backfillSpork(ctx, currentHeight, eventsChan) if err != nil { + r.logger.Error().Err(err).Msg("error backfilling spork") eventsChan <- models.NewBlockEventsError(err) return } r.logger.Info(). - Uint64("start-height", height). - Uint64("last-spork-height", latestHeight). - Msg("backfilling spork") + Uint64("next-height", currentHeight). + Msg("reached the end of spork, checking next spork") + } + }() - ticker := time.NewTicker(time.Millisecond * 10) + return eventsChan +} - maxRange := uint64(249) - for height < latestHeight { +// maxRangeForGetEvents is the maximum range of blocks that can be fetched using the GetEventsForHeightRange method. +const maxRangeForGetEvents = uint64(249) - // TODO: do rate limiting better - <-ticker.C +func (r *RPCEventSubscriber) backfillSpork(ctx context.Context, fromHeight uint64, eventsChan chan<- models.BlockEvents) (uint64, error) { + evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address) - startHeight := height - r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d]...", startHeight, latestHeight)) - endHeight := height + maxRange - if endHeight > latestHeight { - endHeight = latestHeight - } + lastHeight, err := r.client.GetLatestHeightForSpork(ctx, fromHeight) + if err != nil { + eventsChan <- models.NewBlockEventsError(err) + return 0, err + } - evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address) - blockExecutedEvent := common.NewAddressLocation( - nil, - evmAddress, - string(events.EventTypeBlockExecuted), - ).ID() - - transactionExecutedEvent := common.NewAddressLocation( - nil, - evmAddress, - string(events.EventTypeTransactionExecuted), - ).ID() - - // - blocks, err := r.client.GetEventsForHeightRange(ctx, blockExecutedEvent, startHeight, endHeight) - if err != nil { - r.logger.Error().Err(err).Msg("failed to get block events") - eventsChan <- models.NewBlockEventsError(err) - return - } - sort.Slice(blocks, func(i, j int) bool { - return blocks[i].Height < blocks[j].Height - }) + r.logger.Info(). + Uint64("start-height", fromHeight). + Uint64("last-spork-height", lastHeight). + Msg("backfilling spork") - transactions, err := r.client.GetEventsForHeightRange(ctx, transactionExecutedEvent, startHeight, endHeight) - if err != nil { - r.logger.Error().Err(err).Msg("failed to get block events") - eventsChan <- models.NewBlockEventsError(err) - return - } + for fromHeight < lastHeight { + r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d] ...", fromHeight, lastHeight)) - sort.Slice(transactions, func(i, j int) bool { - return transactions[i].Height < transactions[j].Height - }) + startHeight := fromHeight + endHeight := fromHeight + maxRangeForGetEvents + if endHeight > lastHeight { + endHeight = lastHeight + } - if len(transactions) != len(blocks) { - r.logger.Error().Msg("transactions and blocks have different length") - eventsChan <- models.NewBlockEventsError(err) - return - } + blockExecutedEvent := common.NewAddressLocation( + nil, + evmAddress, + string(events.EventTypeBlockExecuted), + ).ID() - for i := range transactions { - if transactions[i].Height != blocks[i].Height { - r.logger.Error().Msg("transactions and blocks have different height") - eventsChan <- models.NewBlockEventsError(err) - return - } - // append the transaction events to the block events - blocks[i].Events = append(blocks[i].Events, transactions[i].Events...) + transactionExecutedEvent := common.NewAddressLocation( + nil, + evmAddress, + string(events.EventTypeTransactionExecuted), + ).ID() - evmEvents := models.NewBlockEvents(blocks[i]) - height = evmEvents.Events.CadenceHeight() + 1 + blocks, err := r.client.GetEventsForHeightRange(ctx, blockExecutedEvent, startHeight, endHeight) + if err != nil { + return 0, fmt.Errorf("failed to get block events: %w", err) + } - } + transactions, err := r.client.GetEventsForHeightRange(ctx, transactionExecutedEvent, startHeight, endHeight) + if err != nil { + return 0, fmt.Errorf("failed to get block events: %w", err) + } + // sort both, just in case + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].Height < blocks[j].Height + }) + sort.Slice(transactions, func(i, j int) bool { + return transactions[i].Height < transactions[j].Height + }) + + if len(transactions) != len(blocks) { + return 0, fmt.Errorf("transactions and blocks have different length") + } + + for i := range transactions { + if transactions[i].Height != blocks[i].Height { + return 0, fmt.Errorf("transactions and blocks have different height") } - ticker.Stop() - r.logger.Info(). - Uint64("next-height", height). - Msg("reached the end of spork, checking next spork") + // append the transaction events to the block events + blocks[i].Events = append(blocks[i].Events, transactions[i].Events...) + + evmEvents := models.NewBlockEvents(blocks[i]) + eventsChan <- evmEvents + + // advance the height + fromHeight = evmEvents.Events.CadenceHeight() + 1 } - }() - return eventsChan + } + return fromHeight, nil } // fetchMissingData is used as a backup mechanism for fetching EVM-related diff --git a/services/requester/cross-spork_client.go b/services/requester/cross-spork_client.go index 91cc7634..4025aa3c 100644 --- a/services/requester/cross-spork_client.go +++ b/services/requester/cross-spork_client.go @@ -5,19 +5,20 @@ import ( "fmt" "github.com/onflow/cadence" + errs "github.com/onflow/flow-evm-gateway/models/errors" "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go-sdk/access" flowGo "github.com/onflow/flow-go/model/flow" "github.com/rs/zerolog" + "go.uber.org/ratelimit" "golang.org/x/exp/slices" - - errs "github.com/onflow/flow-evm-gateway/models/errors" ) type sporkClient struct { - firstHeight uint64 - lastHeight uint64 - client access.Client + firstHeight uint64 + lastHeight uint64 + client access.Client + getEventsForHeightRangeLimiter ratelimit.Limiter } // contains checks if the provided height is withing the range of available heights @@ -25,6 +26,14 @@ func (s *sporkClient) contains(height uint64) bool { return height >= s.firstHeight && height <= s.lastHeight } +func (s *sporkClient) GetEventsForHeightRange( + ctx context.Context, eventType string, startHeight uint64, endHeight uint64, +) ([]flow.BlockEvents, error) { + s.getEventsForHeightRangeLimiter.Take() + + return s.client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight) +} + type sporkClients []*sporkClient // addSpork will add a new spork host defined by the first and last height boundary in that spork. @@ -48,6 +57,8 @@ func (s *sporkClients) add(logger zerolog.Logger, client access.Client) error { firstHeight: info.NodeRootBlockHeight, lastHeight: header.Height, client: client, + // TODO (JanezP): Make this configurable + getEventsForHeightRangeLimiter: ratelimit.New(100, ratelimit.WithoutSlack), }) // make sure clients are always sorted @@ -218,10 +229,17 @@ func (c *CrossSporkClient) SubscribeEventsByBlockHeight( func (c *CrossSporkClient) GetEventsForHeightRange( ctx context.Context, eventType string, startHeight uint64, endHeight uint64, ) ([]flow.BlockEvents, error) { - // TODO: also make sure the endHeight is not too high client, err := c.getClientForHeight(startHeight) if err != nil { return nil, err } + endClient, err := c.getClientForHeight(endHeight) + if err != nil { + return nil, err + } + // there is one client reference per spork, so we can compare the clients + if endClient != client { + return nil, fmt.Errorf("invalid height range, end height %d is not in the same spork as start height %d", endHeight, startHeight) + } return client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight) } diff --git a/services/testutils/mock_client.go b/services/testutils/mock_client.go index 3e4c7faf..5d833252 100644 --- a/services/testutils/mock_client.go +++ b/services/testutils/mock_client.go @@ -15,6 +15,9 @@ type MockClient struct { GetBlockHeaderByHeightFunc func(context.Context, uint64) (*flow.BlockHeader, error) SubscribeEventsByBlockHeightFunc func(context.Context, uint64, flow.EventFilter, ...access.SubscribeOption) (<-chan flow.BlockEvents, <-chan error, error) GetNodeVersionInfoFunc func(ctx context.Context) (*flow.NodeVersionInfo, error) + GetEventsForHeightRangeFunc func( + ctx context.Context, eventType string, startHeight uint64, endHeight uint64, + ) ([]flow.BlockEvents, error) } func (c *MockClient) GetBlockHeaderByHeight(ctx context.Context, height uint64) (*flow.BlockHeader, error) { @@ -38,6 +41,12 @@ func (c *MockClient) SubscribeEventsByBlockHeight( return c.SubscribeEventsByBlockHeightFunc(ctx, startHeight, filter, opts...) } +func (c *MockClient) GetEventsForHeightRange( + ctx context.Context, eventType string, startHeight uint64, endHeight uint64, +) ([]flow.BlockEvents, error) { + return c.GetEventsForHeightRangeFunc(ctx, eventType, startHeight, endHeight) +} + func SetupClientForRange(startHeight uint64, endHeight uint64) *MockClient { client, events := SetupClient(startHeight, endHeight) go func() { @@ -85,5 +94,10 @@ func SetupClient(startHeight uint64, endHeight uint64) (*MockClient, chan flow.B ) (<-chan flow.BlockEvents, <-chan error, error) { return events, make(chan error), nil }, + GetEventsForHeightRangeFunc: func( + ctx context.Context, eventType string, startHeight uint64, endHeight uint64, + ) ([]flow.BlockEvents, error) { + return []flow.BlockEvents{}, nil + }, }, events } From 2e65322b982d5f10162f7b3c91d5e5a06bcaefc2 Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Wed, 13 Nov 2024 18:46:03 +0100 Subject: [PATCH 3/6] fix test and bug --- services/ingestion/event_subscriber.go | 3 ++- services/testutils/mock_client.go | 18 ++++++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/services/ingestion/event_subscriber.go b/services/ingestion/event_subscriber.go index ad919af8..489de7f4 100644 --- a/services/ingestion/event_subscriber.go +++ b/services/ingestion/event_subscriber.go @@ -215,7 +215,8 @@ func (r *RPCEventSubscriber) backfill(ctx context.Context, currentHeight uint64) return } - currentHeight, err := r.backfillSpork(ctx, currentHeight, eventsChan) + var err error + currentHeight, err = r.backfillSpork(ctx, currentHeight, eventsChan) if err != nil { r.logger.Error().Err(err).Msg("error backfilling spork") eventsChan <- models.NewBlockEventsError(err) diff --git a/services/testutils/mock_client.go b/services/testutils/mock_client.go index 5d833252..c0a0c01d 100644 --- a/services/testutils/mock_client.go +++ b/services/testutils/mock_client.go @@ -95,9 +95,23 @@ func SetupClient(startHeight uint64, endHeight uint64) (*MockClient, chan flow.B return events, make(chan error), nil }, GetEventsForHeightRangeFunc: func( - ctx context.Context, eventType string, startHeight uint64, endHeight uint64, + ctx context.Context, eventType string, sh uint64, eh uint64, ) ([]flow.BlockEvents, error) { - return []flow.BlockEvents{}, nil + if sh < startHeight || sh > endHeight { + return nil, storage.ErrNotFound + } + if eh < startHeight || eh > endHeight { + return nil, storage.ErrNotFound + } + + evts := make([]flow.BlockEvents, 0, eh-sh+1) + for i := uint64(0); i <= eh-sh; i++ { + evts = append(evts, flow.BlockEvents{ + Height: sh + i, + }) + } + + return evts, nil }, }, events } From b4fee166842b95c52f68fc079cd5b20cb7480f30 Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Wed, 13 Nov 2024 20:34:47 +0100 Subject: [PATCH 4/6] temp skip test --- services/ingestion/event_subscriber_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/services/ingestion/event_subscriber_test.go b/services/ingestion/event_subscriber_test.go index 04626af2..dfe7458f 100644 --- a/services/ingestion/event_subscriber_test.go +++ b/services/ingestion/event_subscriber_test.go @@ -159,6 +159,7 @@ func Test_MissingBlockEvent(t *testing.T) { // This scenario tests the happy path, when the back-up fetching of // EVM events through the gRPC API, returns the correct data. func Test_SubscribingWithRetryOnError(t *testing.T) { + t.Skip("TODO fix this test") endHeight := uint64(10) var sporkClients []access.Client currentClient := testutils.SetupClientForRange(1, endHeight) @@ -222,6 +223,7 @@ func Test_SubscribingWithRetryOnError(t *testing.T) { // This scenario tests the unhappy path, when the back-up fetching // of EVM events through the gRPC API, returns duplicate EVM blocks. func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) { + t.Skip("TODO fix this test") endHeight := uint64(10) var sporkClients []access.Client currentClient := testutils.SetupClientForRange(1, endHeight) @@ -285,6 +287,7 @@ func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) { // This scenario tests the unhappy path, when the back-up fetching // of EVM events through the gRPC API, returns no EVM blocks. func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) { + t.Skip("TODO fix this test") endHeight := uint64(10) var sporkClients []access.Client currentClient := testutils.SetupClientForRange(1, endHeight) From b2cb7baaeb3d1cacd6901509e6b689eee46d40fb Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Wed, 13 Nov 2024 21:58:46 +0100 Subject: [PATCH 5/6] address review comments --- services/ingestion/event_subscriber.go | 47 +++++++++++++------------- tests/go.mod | 2 ++ tests/go.sum | 2 ++ 3 files changed, 27 insertions(+), 24 deletions(-) diff --git a/services/ingestion/event_subscriber.go b/services/ingestion/event_subscriber.go index 489de7f4..3138269c 100644 --- a/services/ingestion/event_subscriber.go +++ b/services/ingestion/event_subscriber.go @@ -192,12 +192,9 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha return eventsChan } -// backfill will use the provided height and with the client for the provided spork will start backfilling -// events. Before subscribing, it will check what is the latest block in the current spork (defined by height) -// and check for each event it receives whether we reached the end, if we reach the end it will increase -// the height by one (next height), and check if we are still in previous sporks, if so repeat everything, -// otherwise return. -func (r *RPCEventSubscriber) backfill(ctx context.Context, currentHeight uint64) <-chan models.BlockEvents { +// backfill returns a channel that is filled with block events from the provided fromCadenceHeight up to the first +// height in the current spork. +func (r *RPCEventSubscriber) backfill(ctx context.Context, fromCadenceHeight uint64) <-chan models.BlockEvents { eventsChan := make(chan models.BlockEvents) go func() { @@ -206,17 +203,17 @@ func (r *RPCEventSubscriber) backfill(ctx context.Context, currentHeight uint64) }() for { - // check if the current currentHeight is still in past sporks, and if not return since we are done with backfilling - if !r.client.IsPastSpork(currentHeight) { + // check if the current fromCadenceHeight is still in past sporks, and if not return since we are done with backfilling + if !r.client.IsPastSpork(fromCadenceHeight) { r.logger.Info(). - Uint64("height", currentHeight). + Uint64("height", fromCadenceHeight). Msg("completed backfilling") return } var err error - currentHeight, err = r.backfillSpork(ctx, currentHeight, eventsChan) + fromCadenceHeight, err = r.backfillSporkFromHeight(ctx, fromCadenceHeight, eventsChan) if err != nil { r.logger.Error().Err(err).Msg("error backfilling spork") eventsChan <- models.NewBlockEventsError(err) @@ -224,7 +221,7 @@ func (r *RPCEventSubscriber) backfill(ctx context.Context, currentHeight uint64) } r.logger.Info(). - Uint64("next-height", currentHeight). + Uint64("next-cadence-height", fromCadenceHeight). Msg("reached the end of spork, checking next spork") } }() @@ -235,25 +232,27 @@ func (r *RPCEventSubscriber) backfill(ctx context.Context, currentHeight uint64) // maxRangeForGetEvents is the maximum range of blocks that can be fetched using the GetEventsForHeightRange method. const maxRangeForGetEvents = uint64(249) -func (r *RPCEventSubscriber) backfillSpork(ctx context.Context, fromHeight uint64, eventsChan chan<- models.BlockEvents) (uint64, error) { +// / backfillSporkFromHeight will fill the eventsChan with block events from the provided fromHeight up to the first height in the spork that comes +// after the spork of the provided fromHeight. +func (r *RPCEventSubscriber) backfillSporkFromHeight(ctx context.Context, fromCadenceHeight uint64, eventsChan chan<- models.BlockEvents) (uint64, error) { evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address) - lastHeight, err := r.client.GetLatestHeightForSpork(ctx, fromHeight) + lastHeight, err := r.client.GetLatestHeightForSpork(ctx, fromCadenceHeight) if err != nil { eventsChan <- models.NewBlockEventsError(err) return 0, err } r.logger.Info(). - Uint64("start-height", fromHeight). + Uint64("start-height", fromCadenceHeight). Uint64("last-spork-height", lastHeight). Msg("backfilling spork") - for fromHeight < lastHeight { - r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d] ...", fromHeight, lastHeight)) + for fromCadenceHeight < lastHeight { + r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d] ...", fromCadenceHeight, lastHeight)) - startHeight := fromHeight - endHeight := fromHeight + maxRangeForGetEvents + startHeight := fromCadenceHeight + endHeight := fromCadenceHeight + maxRangeForGetEvents if endHeight > lastHeight { endHeight = lastHeight } @@ -280,6 +279,10 @@ func (r *RPCEventSubscriber) backfillSpork(ctx context.Context, fromHeight uint6 return 0, fmt.Errorf("failed to get block events: %w", err) } + if len(transactions) != len(blocks) { + return 0, fmt.Errorf("transactions and blocks have different length") + } + // sort both, just in case sort.Slice(blocks, func(i, j int) bool { return blocks[i].Height < blocks[j].Height @@ -288,10 +291,6 @@ func (r *RPCEventSubscriber) backfillSpork(ctx context.Context, fromHeight uint6 return transactions[i].Height < transactions[j].Height }) - if len(transactions) != len(blocks) { - return 0, fmt.Errorf("transactions and blocks have different length") - } - for i := range transactions { if transactions[i].Height != blocks[i].Height { return 0, fmt.Errorf("transactions and blocks have different height") @@ -304,11 +303,11 @@ func (r *RPCEventSubscriber) backfillSpork(ctx context.Context, fromHeight uint6 eventsChan <- evmEvents // advance the height - fromHeight = evmEvents.Events.CadenceHeight() + 1 + fromCadenceHeight = evmEvents.Events.CadenceHeight() + 1 } } - return fromHeight, nil + return fromCadenceHeight, nil } // fetchMissingData is used as a backup mechanism for fetching EVM-related diff --git a/tests/go.mod b/tests/go.mod index c61c7ed2..c42584d0 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -28,6 +28,7 @@ require ( github.com/StackExchange/wmi v1.2.1 // indirect github.com/VictoriaMetrics/fastcache v1.12.2 // indirect github.com/allegro/bigcache v1.2.1 // indirect + github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.10.0 // indirect github.com/btcsuite/btcd v0.21.0-beta // indirect @@ -217,6 +218,7 @@ require ( go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect + go.uber.org/ratelimit v0.3.1 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/crypto v0.26.0 // indirect golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect diff --git a/tests/go.sum b/tests/go.sum index 1311f1f0..997f8b26 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -1124,6 +1124,8 @@ go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+ go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0= +go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= From ca019dacd2d5d0a5001dc916c255ba03c9116ce3 Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Thu, 14 Nov 2024 12:46:42 +0100 Subject: [PATCH 6/6] unskip tests --- services/ingestion/event_subscriber_test.go | 9 ++++----- services/testutils/mock_client.go | 5 ++++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/services/ingestion/event_subscriber_test.go b/services/ingestion/event_subscriber_test.go index dfe7458f..7505533a 100644 --- a/services/ingestion/event_subscriber_test.go +++ b/services/ingestion/event_subscriber_test.go @@ -159,7 +159,6 @@ func Test_MissingBlockEvent(t *testing.T) { // This scenario tests the happy path, when the back-up fetching of // EVM events through the gRPC API, returns the correct data. func Test_SubscribingWithRetryOnError(t *testing.T) { - t.Skip("TODO fix this test") endHeight := uint64(10) var sporkClients []access.Client currentClient := testutils.SetupClientForRange(1, endHeight) @@ -206,10 +205,10 @@ func Test_SubscribingWithRetryOnError(t *testing.T) { prevHeight = eventHeight if eventHeight == cadenceHeight { - assert.Equal(t, evmBlock, ev.Events.Block()) + require.Equal(t, evmBlock, ev.Events.Block()) for i := 0; i < len(txHashes); i++ { tx := ev.Events.Transactions()[i] - assert.Equal(t, txHashes[i], tx.Hash()) + require.Equal(t, txHashes[i], tx.Hash()) } } } @@ -223,7 +222,6 @@ func Test_SubscribingWithRetryOnError(t *testing.T) { // This scenario tests the unhappy path, when the back-up fetching // of EVM events through the gRPC API, returns duplicate EVM blocks. func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) { - t.Skip("TODO fix this test") endHeight := uint64(10) var sporkClients []access.Client currentClient := testutils.SetupClientForRange(1, endHeight) @@ -287,7 +285,6 @@ func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) { // This scenario tests the unhappy path, when the back-up fetching // of EVM events through the gRPC API, returns no EVM blocks. func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) { - t.Skip("TODO fix this test") endHeight := uint64(10) var sporkClients []access.Client currentClient := testutils.SetupClientForRange(1, endHeight) @@ -420,6 +417,8 @@ func setupClientForBackupEventFetching( cadenceHeight, ).Return([]flow.BlockEvents{evmTxEvents}, nil).Once() + client.GetEventsForHeightRangeFunc = nil + client.SubscribeEventsByBlockHeightFunc = func( ctx context.Context, startHeight uint64, diff --git a/services/testutils/mock_client.go b/services/testutils/mock_client.go index c0a0c01d..2f65021a 100644 --- a/services/testutils/mock_client.go +++ b/services/testutils/mock_client.go @@ -44,7 +44,10 @@ func (c *MockClient) SubscribeEventsByBlockHeight( func (c *MockClient) GetEventsForHeightRange( ctx context.Context, eventType string, startHeight uint64, endHeight uint64, ) ([]flow.BlockEvents, error) { - return c.GetEventsForHeightRangeFunc(ctx, eventType, startHeight, endHeight) + if c.GetEventsForHeightRangeFunc != nil { + return c.GetEventsForHeightRangeFunc(ctx, eventType, startHeight, endHeight) + } + return c.Client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight) } func SetupClientForRange(startHeight uint64, endHeight uint64) *MockClient {