Skip to content

Commit

Permalink
pull start height out of event subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
janezpodhostnik committed Oct 29, 2024
1 parent 2b52189 commit 197617c
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 61 deletions.
7 changes: 1 addition & 6 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,7 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error {
Msg("indexing cadence height information")

// create event subscriber
subscriber := ingestion.NewRPCEventSubscriber(
b.client,
b.config.HeartbeatInterval,
b.config.FlowNetworkID,
b.logger,
)
subscriber := ingestion.NewRPCEventSubscriber(b.logger, b.client, b.config.FlowNetworkID, latestCadenceHeight, b.config.HeartbeatInterval)

// initialize event ingestion engine
b.events = ingestion.NewEventIngestionEngine(
Expand Down
14 changes: 4 additions & 10 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,25 +98,19 @@ func (e *Engine) Stop() {
// drops.
// All other errors are unexpected.
func (e *Engine) Run(ctx context.Context) error {
latestCadence, err := e.blocks.LatestCadenceHeight()
if err != nil {
return fmt.Errorf("failed to get latest cadence height: %w", err)
}

e.log.Info().Uint64("start-cadence-height", latestCadence).Msg("starting ingestion")
e.log.Info().Msg("starting ingestion")

e.MarkReady()

for events := range e.subscriber.Subscribe(ctx, latestCadence) {
for events := range e.subscriber.Subscribe(ctx) {
if events.Err != nil {
return fmt.Errorf(
"failure in event subscription at height %d, with: %w",
latestCadence,
"failure in event subscription with: %w",
events.Err,
)
}

err = e.processEvents(events.Events)
err := e.processEvents(events.Events)
if err != nil {
e.log.Error().Err(err).Msg("failed to process EVM events")
return err
Expand Down
21 changes: 10 additions & 11 deletions services/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func TestSerialBlockIngestion(t *testing.T) {

subscriber := &mocks.EventSubscriber{}
subscriber.
On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")).
Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents {
On("Subscribe", mock.Anything).
Return(func(ctx context.Context) <-chan models.BlockEvents {
return eventsChan
})

Expand Down Expand Up @@ -136,8 +136,8 @@ func TestSerialBlockIngestion(t *testing.T) {
eventsChan := make(chan models.BlockEvents)
subscriber := &mocks.EventSubscriber{}
subscriber.
On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")).
Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents {
On("Subscribe", mock.Anything).
Return(func(ctx context.Context) <-chan models.BlockEvents {
return eventsChan
})

Expand Down Expand Up @@ -246,8 +246,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
eventsChan := make(chan models.BlockEvents)
subscriber := &mocks.EventSubscriber{}
subscriber.
On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")).
Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents {
On("Subscribe", mock.Anything).
Return(func(ctx context.Context) <-chan models.BlockEvents {
return eventsChan
})

Expand Down Expand Up @@ -349,8 +349,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
eventsChan := make(chan models.BlockEvents)
subscriber := &mocks.EventSubscriber{}
subscriber.
On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")).
Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents {
On("Subscribe", mock.Anything).
Return(func(ctx context.Context) <-chan models.BlockEvents {
return eventsChan
})

Expand Down Expand Up @@ -448,9 +448,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
eventsChan := make(chan models.BlockEvents)
subscriber := &mocks.EventSubscriber{}
subscriber.
On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")).
Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents {
assert.Equal(t, latestCadenceHeight, latest)
On("Subscribe", mock.Anything).
Return(func(ctx context.Context) <-chan models.BlockEvents {
return eventsChan
}).
Once()
Expand Down
25 changes: 14 additions & 11 deletions services/ingestion/event_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type EventSubscriber interface {
//
// The BlockEvents type will contain an optional error in case
// the error happens, the consumer of the chanel should handle it.
Subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents
Subscribe(ctx context.Context) <-chan models.BlockEvents
}

var _ EventSubscriber = &RPCEventSubscriber{}
Expand All @@ -40,16 +40,18 @@ type RPCEventSubscriber struct {

client *requester.CrossSporkClient
chain flowGo.ChainID
height uint64

recovery bool
recoveredEvents []flow.Event
}

func NewRPCEventSubscriber(
logger zerolog.Logger,
client *requester.CrossSporkClient,
heartbeatInterval uint64,
chainID flowGo.ChainID,
logger zerolog.Logger,
startHeight uint64,
heartbeatInterval uint64,
) *RPCEventSubscriber {
logger = logger.With().Str("component", "subscriber").Logger()
return &RPCEventSubscriber{
Expand All @@ -61,6 +63,7 @@ func NewRPCEventSubscriber(

client: client,
chain: chainID,
height: startHeight,
}
}

Expand All @@ -69,7 +72,7 @@ func NewRPCEventSubscriber(
// to listen all new events in the current spork.
//
// If error is encountered during backfill the subscription will end and the response chanel will be closed.
func (r *RPCEventSubscriber) Subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents {
func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)

go func() {
Expand All @@ -78,13 +81,13 @@ func (r *RPCEventSubscriber) Subscribe(ctx context.Context, height uint64) <-cha
}()

// if the height is from the previous spork, backfill all the eventsChan from previous sporks first
if r.client.IsPastSpork(height) {
if r.client.IsPastSpork(r.height) {
r.logger.Info().
Uint64("height", height).
Uint64("height", r.height).
Msg("height found in previous spork, starting to backfill")

// backfill all the missed eventsChan, handling of context cancellation is done by the producer
for ev := range r.backfill(ctx, height) {
for ev := range r.backfill(ctx, r.height) {
eventsChan <- ev

if ev.Err != nil {
Expand All @@ -93,21 +96,21 @@ func (r *RPCEventSubscriber) Subscribe(ctx context.Context, height uint64) <-cha

// keep updating height, so after we are done back-filling
// it will be at the first height in the current spork
height = ev.Events.CadenceHeight()
r.height = ev.Events.CadenceHeight()
}

// after back-filling is done, increment height by one,
// so we start with the height in the current spork
height = height + 1
r.height = r.height + 1
}

r.logger.Info().
Uint64("next-height", height).
Uint64("next-height", r.height).
Msg("backfilling done, subscribe for live data")

// subscribe in the current spork, handling of context cancellation is done by the producer
// TODO(JanezP): I think the heartbeat interval should always be 1 here
for ev := range r.subscribe(ctx, height, access.WithHeartbeatInterval(r.HeartbeatInterval)) {
for ev := range r.subscribe(ctx, r.height, access.WithHeartbeatInterval(r.HeartbeatInterval)) {
eventsChan <- ev
}

Expand Down
36 changes: 18 additions & 18 deletions services/ingestion/event_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func Test_Subscribing(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCEventSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop())
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100)

events := subscriber.Subscribe(context.Background(), 1)
events := subscriber.Subscribe(context.Background())

var prevHeight uint64

Expand Down Expand Up @@ -83,9 +83,9 @@ func Test_MissingBlockEvent(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCEventSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop())
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100)

events := subscriber.Subscribe(context.Background(), 1)
events := subscriber.Subscribe(context.Background())

missingHashes := make([]gethCommon.Hash, 0)

Expand Down Expand Up @@ -160,7 +160,7 @@ func Test_MissingBlockEvent(t *testing.T) {
// EVM events through the gRPC API, returns the correct data.
func Test_SubscribingWithRetryOnError(t *testing.T) {
endHeight := uint64(10)
sporkClients := []access.Client{}
var sporkClients []access.Client
currentClient := testutils.SetupClientForRange(1, endHeight)

cadenceHeight := uint64(5)
Expand All @@ -185,9 +185,9 @@ func Test_SubscribingWithRetryOnError(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCEventSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop())
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100)

events := subscriber.Subscribe(context.Background(), 1)
events := subscriber.Subscribe(context.Background())

var prevHeight uint64

Expand All @@ -214,7 +214,7 @@ func Test_SubscribingWithRetryOnError(t *testing.T) {
}

// this makes sure we indexed all the events
require.Equal(t, uint64(endHeight), prevHeight)
require.Equal(t, endHeight, prevHeight)
}

// Test that back-up fetching of EVM events is triggered when the
Expand All @@ -223,7 +223,7 @@ func Test_SubscribingWithRetryOnError(t *testing.T) {
// of EVM events through the gRPC API, returns duplicate EVM blocks.
func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) {
endHeight := uint64(10)
sporkClients := []access.Client{}
var sporkClients []access.Client
currentClient := testutils.SetupClientForRange(1, endHeight)

cadenceHeight := uint64(5)
Expand All @@ -248,9 +248,9 @@ func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCEventSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop())
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100)

events := subscriber.Subscribe(context.Background(), 1)
events := subscriber.Subscribe(context.Background())

var prevHeight uint64

Expand Down Expand Up @@ -286,7 +286,7 @@ func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) {
// of EVM events through the gRPC API, returns no EVM blocks.
func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) {
endHeight := uint64(10)
sporkClients := []access.Client{}
var sporkClients []access.Client
currentClient := testutils.SetupClientForRange(1, endHeight)

cadenceHeight := uint64(5)
Expand All @@ -310,9 +310,9 @@ func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) {
)
require.NoError(t, err)

subscriber := NewRPCEventSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop())
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, 1, 100)

events := subscriber.Subscribe(context.Background(), 1)
events := subscriber.Subscribe(context.Background())

var prevHeight uint64

Expand Down Expand Up @@ -405,16 +405,16 @@ func setupClientForBackupEventFetching(
"GetEventsForHeightRange",
mock.AnythingOfType("context.backgroundCtx"),
"A.b6763b4399a888c8.EVM.BlockExecuted",
uint64(cadenceHeight),
uint64(cadenceHeight),
cadenceHeight,
cadenceHeight,
).Return(evmBlockEvents, nil).Once()

client.On(
"GetEventsForHeightRange",
mock.AnythingOfType("context.backgroundCtx"),
"A.b6763b4399a888c8.EVM.TransactionExecuted",
uint64(cadenceHeight),
uint64(cadenceHeight),
cadenceHeight,
cadenceHeight,
).Return([]flow.BlockEvents{evmTxEvents}, nil).Once()

client.SubscribeEventsByBlockHeightFunc = func(
Expand Down
10 changes: 5 additions & 5 deletions services/ingestion/mocks/EventSubscriber.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 197617c

Please sign in to comment.