Skip to content

Commit

Permalink
Implement BlockProvider for block snapshotting
Browse files Browse the repository at this point in the history
  • Loading branch information
m-Peter committed Oct 30, 2024
1 parent 89510bc commit 3cbf6ae
Show file tree
Hide file tree
Showing 6 changed files with 457 additions and 4 deletions.
27 changes: 27 additions & 0 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bootstrap

import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/onflow/flow-go-sdk/access/grpc"
"github.com/onflow/flow-go-sdk/crypto"
gethTypes "github.com/onflow/go-ethereum/core/types"
"github.com/onflow/go-ethereum/eth/tracers"
"github.com/rs/zerolog"
"github.com/sethvargo/go-limiter/memorystore"
grpcOpts "google.golang.org/grpc"
Expand All @@ -21,10 +23,20 @@ import (
"github.com/onflow/flow-evm-gateway/models"
errs "github.com/onflow/flow-evm-gateway/models/errors"
"github.com/onflow/flow-evm-gateway/services/ingestion"
"github.com/onflow/flow-evm-gateway/services/replayer"
"github.com/onflow/flow-evm-gateway/services/requester"
"github.com/onflow/flow-evm-gateway/services/traces"
"github.com/onflow/flow-evm-gateway/storage"
"github.com/onflow/flow-evm-gateway/storage/pebble"

// this import is needed for side-effects, because the
// tracers.DefaultDirectory is relying on the init function
_ "github.com/onflow/go-ethereum/eth/tracers/native"
)

const (
callTracerConfig = `{ "onlyTopCall": true }`
callTracerName = "callTracer"
)

type Storages struct {
Expand Down Expand Up @@ -124,9 +136,24 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error {
latestCadenceHeight,
)

tracer, err := tracers.DefaultDirectory.New(
callTracerName,
&tracers.Context{},
json.RawMessage(callTracerConfig),
)
if err != nil {
return err
}
blocksProvider := replayer.NewBlocksProvider(
b.storages.Blocks,
b.config.FlowNetworkID,
tracer,
)

// initialize event ingestion engine
b.events = ingestion.NewEventIngestionEngine(
subscriber,
blocksProvider,
b.storages.Storage,
b.storages.Blocks,
b.storages.Receipts,
Expand Down
12 changes: 12 additions & 0 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/onflow/flow-evm-gateway/metrics"
"github.com/onflow/flow-evm-gateway/models"
"github.com/onflow/flow-evm-gateway/services/replayer"
"github.com/onflow/flow-evm-gateway/storage"
"github.com/onflow/flow-evm-gateway/storage/pebble"
)
Expand All @@ -35,6 +36,7 @@ type Engine struct {
*models.EngineStatus

subscriber EventSubscriber
blocksProvider *replayer.BlocksProvider
store *pebble.Storage
blocks storage.BlockIndexer
receipts storage.ReceiptIndexer
Expand All @@ -49,6 +51,7 @@ type Engine struct {

func NewEventIngestionEngine(
subscriber EventSubscriber,
blocksProvider *replayer.BlocksProvider,
store *pebble.Storage,
blocks storage.BlockIndexer,
receipts storage.ReceiptIndexer,
Expand All @@ -65,6 +68,7 @@ func NewEventIngestionEngine(
EngineStatus: models.NewEngineStatus(),

subscriber: subscriber,
blocksProvider: blocksProvider,
store: store,
blocks: blocks,
receipts: receipts,
Expand Down Expand Up @@ -179,6 +183,14 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
return fmt.Errorf("failed to index receipts for block %d event: %w", events.Block().Height, err)
}

if err := e.blocksProvider.OnBlockReceived(events.Block()); err != nil {
return fmt.Errorf(
"failed to call OnBlockReceived for block %d, with: %w",
events.Block().Height,
err,
)
}

if err := batch.Commit(pebbleDB.Sync); err != nil {
return fmt.Errorf("failed to commit indexed data for Cadence block %d: %w", events.CadenceHeight(), err)
}
Expand Down
6 changes: 6 additions & 0 deletions services/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/onflow/flow-evm-gateway/metrics"
"github.com/onflow/flow-evm-gateway/services/ingestion/mocks"
"github.com/onflow/flow-evm-gateway/services/replayer"
"github.com/onflow/flow-evm-gateway/storage/pebble"

"github.com/onflow/cadence"
Expand Down Expand Up @@ -63,6 +64,7 @@ func TestSerialBlockIngestion(t *testing.T) {

engine := NewEventIngestionEngine(
subscriber,
replayer.NewBlocksProvider(blocks, flowGo.Emulator, nil),
store,
blocks,
receipts,
Expand Down Expand Up @@ -143,6 +145,7 @@ func TestSerialBlockIngestion(t *testing.T) {

engine := NewEventIngestionEngine(
subscriber,
replayer.NewBlocksProvider(blocks, flowGo.Emulator, nil),
store,
blocks,
receipts,
Expand Down Expand Up @@ -258,6 +261,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {

engine := NewEventIngestionEngine(
subscriber,
replayer.NewBlocksProvider(blocks, flowGo.Emulator, nil),
store,
blocks,
receipts,
Expand Down Expand Up @@ -361,6 +365,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {

engine := NewEventIngestionEngine(
subscriber,
replayer.NewBlocksProvider(blocks, flowGo.Emulator, nil),
store,
blocks,
receipts,
Expand Down Expand Up @@ -456,6 +461,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {

engine := NewEventIngestionEngine(
subscriber,
replayer.NewBlocksProvider(blocks, flowGo.Emulator, nil),
store,
blocks,
receipts,
Expand Down
112 changes: 112 additions & 0 deletions services/replayer/blocks_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package replayer

import (
"fmt"

"github.com/onflow/flow-evm-gateway/models"
"github.com/onflow/flow-evm-gateway/storage"
evmTypes "github.com/onflow/flow-go/fvm/evm/types"
flowGo "github.com/onflow/flow-go/model/flow"
gethCommon "github.com/onflow/go-ethereum/common"
"github.com/onflow/go-ethereum/eth/tracers"
)

type blockSnapshot struct {
*BlocksProvider
block models.Block
}

var _ evmTypes.BlockSnapshot = (*blockSnapshot)(nil)

func (bs *blockSnapshot) BlockContext() (evmTypes.BlockContext, error) {
return evmTypes.BlockContext{
ChainID: evmTypes.EVMChainIDFromFlowChainID(bs.chainID),
BlockNumber: bs.block.Height,
BlockTimestamp: bs.block.Timestamp,
DirectCallBaseGasUsage: evmTypes.DefaultDirectCallBaseGasUsage,
DirectCallGasPrice: evmTypes.DefaultDirectCallGasPrice,
GasFeeCollector: evmTypes.CoinbaseAddress,
GetHashFunc: func(n uint64) gethCommon.Hash {
// For block heights greater than or equal to the current,
// return an empty block hash.
if n >= bs.block.Height {
return gethCommon.Hash{}
}
// If the given block height, is more than 256 blocks
// in the past, return an empty block hash.
if bs.block.Height-n > 256 {
return gethCommon.Hash{}
}

block, err := bs.blocks.GetByHeight(n)
if err != nil {
return gethCommon.Hash{}
}
blockHash, err := block.Hash()
if err != nil {
return gethCommon.Hash{}
}

return blockHash
},
Random: bs.block.PrevRandao,
Tracer: bs.tracer,
}, nil
}

type BlocksProvider struct {
blocks storage.BlockIndexer
chainID flowGo.ChainID
tracer *tracers.Tracer
latestBlock *models.Block
}

var _ evmTypes.BlockSnapshotProvider = (*BlocksProvider)(nil)

func NewBlocksProvider(
blocks storage.BlockIndexer,
chainID flowGo.ChainID,
tracer *tracers.Tracer,
) *BlocksProvider {
return &BlocksProvider{
blocks: blocks,
chainID: chainID,
tracer: tracer,
}
}

func (bp *BlocksProvider) OnBlockReceived(block *models.Block) error {
if bp.latestBlock != nil && bp.latestBlock.Height != (block.Height-1) {
return fmt.Errorf(
"received new block: %d, non-sequential of latest block: %d",
block.Height,
bp.latestBlock.Height,
)
}

bp.latestBlock = block

return nil
}

func (bp *BlocksProvider) GetSnapshotAt(height uint64) (
evmTypes.BlockSnapshot,
error,
) {
if bp.latestBlock != nil && bp.latestBlock.Height == height {
return &blockSnapshot{
BlocksProvider: bp,
block: *bp.latestBlock,
}, nil
}

block, err := bp.blocks.GetByHeight(height)
if err != nil {
return nil, err
}

return &blockSnapshot{
BlocksProvider: bp,
block: *block,
}, nil
}
Loading

0 comments on commit 3cbf6ae

Please sign in to comment.