From 197617ce17c20d201fe646dc5f359417710bf546 Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Tue, 29 Oct 2024 16:24:23 +0100 Subject: [PATCH] pull start height out of event subscriber --- bootstrap/bootstrap.go | 7 +--- services/ingestion/engine.go | 14 +++----- services/ingestion/engine_test.go | 21 ++++++------ services/ingestion/event_subscriber.go | 25 +++++++------- services/ingestion/event_subscriber_test.go | 36 ++++++++++----------- services/ingestion/mocks/EventSubscriber.go | 10 +++--- 6 files changed, 52 insertions(+), 61 deletions(-) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index bc8ccc1d..df8e53a7 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -117,12 +117,7 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error { Msg("indexing cadence height information") // create event subscriber - subscriber := ingestion.NewRPCEventSubscriber( - b.client, - b.config.HeartbeatInterval, - b.config.FlowNetworkID, - b.logger, - ) + subscriber := ingestion.NewRPCEventSubscriber(b.logger, b.client, b.config.FlowNetworkID, latestCadenceHeight, b.config.HeartbeatInterval) // initialize event ingestion engine b.events = ingestion.NewEventIngestionEngine( diff --git a/services/ingestion/engine.go b/services/ingestion/engine.go index 37aeae0b..62bde449 100644 --- a/services/ingestion/engine.go +++ b/services/ingestion/engine.go @@ -98,25 +98,19 @@ func (e *Engine) Stop() { // drops. // All other errors are unexpected. func (e *Engine) Run(ctx context.Context) error { - latestCadence, err := e.blocks.LatestCadenceHeight() - if err != nil { - return fmt.Errorf("failed to get latest cadence height: %w", err) - } - - e.log.Info().Uint64("start-cadence-height", latestCadence).Msg("starting ingestion") + e.log.Info().Msg("starting ingestion") e.MarkReady() - for events := range e.subscriber.Subscribe(ctx, latestCadence) { + for events := range e.subscriber.Subscribe(ctx) { if events.Err != nil { return fmt.Errorf( - "failure in event subscription at height %d, with: %w", - latestCadence, + "failure in event subscription with: %w", events.Err, ) } - err = e.processEvents(events.Events) + err := e.processEvents(events.Events) if err != nil { e.log.Error().Err(err).Msg("failed to process EVM events") return err diff --git a/services/ingestion/engine_test.go b/services/ingestion/engine_test.go index c7f6a77b..0fddb5e7 100644 --- a/services/ingestion/engine_test.go +++ b/services/ingestion/engine_test.go @@ -56,8 +56,8 @@ func TestSerialBlockIngestion(t *testing.T) { subscriber := &mocks.EventSubscriber{} subscriber. - On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")). - Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents { + On("Subscribe", mock.Anything). + Return(func(ctx context.Context) <-chan models.BlockEvents { return eventsChan }) @@ -136,8 +136,8 @@ func TestSerialBlockIngestion(t *testing.T) { eventsChan := make(chan models.BlockEvents) subscriber := &mocks.EventSubscriber{} subscriber. - On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")). - Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents { + On("Subscribe", mock.Anything). + Return(func(ctx context.Context) <-chan models.BlockEvents { return eventsChan }) @@ -246,8 +246,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) { eventsChan := make(chan models.BlockEvents) subscriber := &mocks.EventSubscriber{} subscriber. - On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")). - Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents { + On("Subscribe", mock.Anything). + Return(func(ctx context.Context) <-chan models.BlockEvents { return eventsChan }) @@ -349,8 +349,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) { eventsChan := make(chan models.BlockEvents) subscriber := &mocks.EventSubscriber{} subscriber. - On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")). - Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents { + On("Subscribe", mock.Anything). + Return(func(ctx context.Context) <-chan models.BlockEvents { return eventsChan }) @@ -448,9 +448,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) { eventsChan := make(chan models.BlockEvents) subscriber := &mocks.EventSubscriber{} subscriber. - On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")). - Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents { - assert.Equal(t, latestCadenceHeight, latest) + On("Subscribe", mock.Anything). + Return(func(ctx context.Context) <-chan models.BlockEvents { return eventsChan }). Once() diff --git a/services/ingestion/event_subscriber.go b/services/ingestion/event_subscriber.go index 24fc4dd7..d8508e68 100644 --- a/services/ingestion/event_subscriber.go +++ b/services/ingestion/event_subscriber.go @@ -24,7 +24,7 @@ type EventSubscriber interface { // // The BlockEvents type will contain an optional error in case // the error happens, the consumer of the chanel should handle it. - Subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents + Subscribe(ctx context.Context) <-chan models.BlockEvents } var _ EventSubscriber = &RPCEventSubscriber{} @@ -40,16 +40,18 @@ type RPCEventSubscriber struct { client *requester.CrossSporkClient chain flowGo.ChainID + height uint64 recovery bool recoveredEvents []flow.Event } func NewRPCEventSubscriber( + logger zerolog.Logger, client *requester.CrossSporkClient, - heartbeatInterval uint64, chainID flowGo.ChainID, - logger zerolog.Logger, + startHeight uint64, + heartbeatInterval uint64, ) *RPCEventSubscriber { logger = logger.With().Str("component", "subscriber").Logger() return &RPCEventSubscriber{ @@ -61,6 +63,7 @@ func NewRPCEventSubscriber( client: client, chain: chainID, + height: startHeight, } } @@ -69,7 +72,7 @@ func NewRPCEventSubscriber( // to listen all new events in the current spork. // // If error is encountered during backfill the subscription will end and the response chanel will be closed. -func (r *RPCEventSubscriber) Subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents { +func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockEvents { eventsChan := make(chan models.BlockEvents) go func() { @@ -78,13 +81,13 @@ func (r *RPCEventSubscriber) Subscribe(ctx context.Context, height uint64) <-cha }() // if the height is from the previous spork, backfill all the eventsChan from previous sporks first - if r.client.IsPastSpork(height) { + if r.client.IsPastSpork(r.height) { r.logger.Info(). - Uint64("height", height). + Uint64("height", r.height). Msg("height found in previous spork, starting to backfill") // backfill all the missed eventsChan, handling of context cancellation is done by the producer - for ev := range r.backfill(ctx, height) { + for ev := range r.backfill(ctx, r.height) { eventsChan <- ev if ev.Err != nil { @@ -93,21 +96,21 @@ func (r *RPCEventSubscriber) Subscribe(ctx context.Context, height uint64) <-cha // keep updating height, so after we are done back-filling // it will be at the first height in the current spork - height = ev.Events.CadenceHeight() + r.height = ev.Events.CadenceHeight() } // after back-filling is done, increment height by one, // so we start with the height in the current spork - height = height + 1 + r.height = r.height + 1 } r.logger.Info(). - Uint64("next-height", height). + Uint64("next-height", r.height). Msg("backfilling done, subscribe for live data") // subscribe in the current spork, handling of context cancellation is done by the producer // TODO(JanezP): I think the heartbeat interval should always be 1 here - for ev := range r.subscribe(ctx, height, access.WithHeartbeatInterval(r.HeartbeatInterval)) { + for ev := range r.subscribe(ctx, r.height, access.WithHeartbeatInterval(r.HeartbeatInterval)) { eventsChan <- ev } diff --git a/services/ingestion/event_subscriber_test.go b/services/ingestion/event_subscriber_test.go index 22865728..19151ea2 100644 --- a/services/ingestion/event_subscriber_test.go +++ b/services/ingestion/event_subscriber_test.go @@ -43,9 +43,9 @@ func Test_Subscribing(t *testing.T) { ) require.NoError(t, err) - subscriber := NewRPCEventSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop()) + subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100) - events := subscriber.Subscribe(context.Background(), 1) + events := subscriber.Subscribe(context.Background()) var prevHeight uint64 @@ -83,9 +83,9 @@ func Test_MissingBlockEvent(t *testing.T) { ) require.NoError(t, err) - subscriber := NewRPCEventSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop()) + subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100) - events := subscriber.Subscribe(context.Background(), 1) + events := subscriber.Subscribe(context.Background()) missingHashes := make([]gethCommon.Hash, 0) @@ -160,7 +160,7 @@ func Test_MissingBlockEvent(t *testing.T) { // EVM events through the gRPC API, returns the correct data. func Test_SubscribingWithRetryOnError(t *testing.T) { endHeight := uint64(10) - sporkClients := []access.Client{} + var sporkClients []access.Client currentClient := testutils.SetupClientForRange(1, endHeight) cadenceHeight := uint64(5) @@ -185,9 +185,9 @@ func Test_SubscribingWithRetryOnError(t *testing.T) { ) require.NoError(t, err) - subscriber := NewRPCEventSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop()) + subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100) - events := subscriber.Subscribe(context.Background(), 1) + events := subscriber.Subscribe(context.Background()) var prevHeight uint64 @@ -214,7 +214,7 @@ func Test_SubscribingWithRetryOnError(t *testing.T) { } // this makes sure we indexed all the events - require.Equal(t, uint64(endHeight), prevHeight) + require.Equal(t, endHeight, prevHeight) } // Test that back-up fetching of EVM events is triggered when the @@ -223,7 +223,7 @@ func Test_SubscribingWithRetryOnError(t *testing.T) { // of EVM events through the gRPC API, returns duplicate EVM blocks. func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) { endHeight := uint64(10) - sporkClients := []access.Client{} + var sporkClients []access.Client currentClient := testutils.SetupClientForRange(1, endHeight) cadenceHeight := uint64(5) @@ -248,9 +248,9 @@ func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) { ) require.NoError(t, err) - subscriber := NewRPCEventSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop()) + subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100) - events := subscriber.Subscribe(context.Background(), 1) + events := subscriber.Subscribe(context.Background()) var prevHeight uint64 @@ -286,7 +286,7 @@ func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) { // of EVM events through the gRPC API, returns no EVM blocks. func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) { endHeight := uint64(10) - sporkClients := []access.Client{} + var sporkClients []access.Client currentClient := testutils.SetupClientForRange(1, endHeight) cadenceHeight := uint64(5) @@ -310,9 +310,9 @@ func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) { ) require.NoError(t, err) - subscriber := NewRPCEventSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop()) + subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100) - events := subscriber.Subscribe(context.Background(), 1) + events := subscriber.Subscribe(context.Background()) var prevHeight uint64 @@ -405,16 +405,16 @@ func setupClientForBackupEventFetching( "GetEventsForHeightRange", mock.AnythingOfType("context.backgroundCtx"), "A.b6763b4399a888c8.EVM.BlockExecuted", - uint64(cadenceHeight), - uint64(cadenceHeight), + cadenceHeight, + cadenceHeight, ).Return(evmBlockEvents, nil).Once() client.On( "GetEventsForHeightRange", mock.AnythingOfType("context.backgroundCtx"), "A.b6763b4399a888c8.EVM.TransactionExecuted", - uint64(cadenceHeight), - uint64(cadenceHeight), + cadenceHeight, + cadenceHeight, ).Return([]flow.BlockEvents{evmTxEvents}, nil).Once() client.SubscribeEventsByBlockHeightFunc = func( diff --git a/services/ingestion/mocks/EventSubscriber.go b/services/ingestion/mocks/EventSubscriber.go index 021c708d..11b05e89 100644 --- a/services/ingestion/mocks/EventSubscriber.go +++ b/services/ingestion/mocks/EventSubscriber.go @@ -15,17 +15,17 @@ type EventSubscriber struct { mock.Mock } -// Subscribe provides a mock function with given fields: ctx, height -func (_m *EventSubscriber) Subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents { - ret := _m.Called(ctx, height) +// Subscribe provides a mock function with given fields: ctx +func (_m *EventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockEvents { + ret := _m.Called(ctx) if len(ret) == 0 { panic("no return value specified for Subscribe") } var r0 <-chan models.BlockEvents - if rf, ok := ret.Get(0).(func(context.Context, uint64) <-chan models.BlockEvents); ok { - r0 = rf(ctx, height) + if rf, ok := ret.Get(0).(func(context.Context) <-chan models.BlockEvents); ok { + r0 = rf(ctx) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(<-chan models.BlockEvents)