Skip to content

Commit

Permalink
Handle missing EVM block event in backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
m-Peter committed Nov 15, 2024
1 parent fe346c4 commit 79e5010
Showing 1 changed file with 83 additions and 12 deletions.
95 changes: 83 additions & 12 deletions services/ingestion/event_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,18 @@ const maxRangeForGetEvents = uint64(249)
func (r *RPCEventSubscriber) backfillSporkFromHeight(ctx context.Context, fromCadenceHeight uint64, eventsChan chan<- models.BlockEvents) (uint64, error) {
evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address)

blockExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeBlockExecuted),
).ID()

transactionExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeTransactionExecuted),
).ID()

lastHeight, err := r.client.GetLatestHeightForSpork(ctx, fromCadenceHeight)
if err != nil {
eventsChan <- models.NewBlockEventsError(err)
Expand All @@ -257,18 +269,6 @@ func (r *RPCEventSubscriber) backfillSporkFromHeight(ctx context.Context, fromCa
endHeight = lastHeight
}

blockExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeBlockExecuted),
).ID()

transactionExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeTransactionExecuted),
).ID()

blocks, err := r.client.GetEventsForHeightRange(ctx, blockExecutedEvent, startHeight, endHeight)
if err != nil {
return 0, fmt.Errorf("failed to get block events: %w", err)
Expand Down Expand Up @@ -309,6 +309,12 @@ func (r *RPCEventSubscriber) backfillSporkFromHeight(ctx context.Context, fromCa
blocks[i].Events = append(blocks[i].Events, txEvents...)

evmEvents := models.NewBlockEvents(blocks[i])
if evmEvents.Err != nil && errors.Is(evmEvents.Err, errs.ErrMissingBlock) {
evmEvents, err = r.accumulateBlockEvents(ctx, blocks[i], blockExecutedEvent, transactionExecutedEvent)
if err != nil {
return fromCadenceHeight, err
}
}
eventsChan <- evmEvents

// advance the height
Expand All @@ -319,6 +325,71 @@ func (r *RPCEventSubscriber) backfillSporkFromHeight(ctx context.Context, fromCa
return fromCadenceHeight, nil
}

func (r *RPCEventSubscriber) accumulateBlockEvents(
ctx context.Context,
block flow.BlockEvents,
blockExecutedEventType string,
txExecutedEventType string,
) (models.BlockEvents, error) {
evmEvents := models.NewBlockEvents(block)
for evmEvents.Err != nil && errors.Is(evmEvents.Err, errs.ErrMissingBlock) {
blocks, err := r.client.GetEventsForHeightRange(
ctx,
blockExecutedEventType,
block.Height,
block.Height,
)
if err != nil {
return models.BlockEvents{}, fmt.Errorf("failed to get block events: %w", err)
}

transactions, err := r.client.GetEventsForHeightRange(
ctx,
txExecutedEventType,
block.Height,
block.Height,
)
if err != nil {
return models.BlockEvents{}, fmt.Errorf("failed to get block events: %w", err)
}

if len(transactions) != len(blocks) {
return models.BlockEvents{}, fmt.Errorf("transactions and blocks have different length")
}

// sort both, just in case
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Height < blocks[j].Height
})
sort.Slice(transactions, func(i, j int) bool {
return transactions[i].Height < transactions[j].Height
})

for i := range transactions {
if transactions[i].Height != blocks[i].Height {
return models.BlockEvents{}, fmt.Errorf("transactions and blocks have different height")
}

// append the transaction events to the block events
// first we sort all the events in the block, by their TransactionIndex,
// and then we also sort events in the same transaction, by their EventIndex.
txEvents := transactions[i].Events
sort.Slice(txEvents, func(i, j int) bool {
if txEvents[i].TransactionIndex != txEvents[j].TransactionIndex {
return txEvents[i].TransactionIndex < txEvents[j].TransactionIndex
}
return txEvents[i].EventIndex < txEvents[j].EventIndex
})
blocks[i].Events = append(blocks[i].Events, txEvents...)

block = blocks[i]
evmEvents = models.NewBlockEvents(block)
}
}

return evmEvents, nil
}

// fetchMissingData is used as a backup mechanism for fetching EVM-related
// events, when the event streaming API returns an inconsistent response.
// An inconsistent response could be an EVM block that references EVM
Expand Down

0 comments on commit 79e5010

Please sign in to comment.