diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index dbed6f48..6070d940 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -2,6 +2,7 @@ package bootstrap import ( "context" + "encoding/json" "errors" "fmt" "math" @@ -11,6 +12,7 @@ import ( "github.com/onflow/flow-go-sdk/access/grpc" "github.com/onflow/flow-go-sdk/crypto" gethTypes "github.com/onflow/go-ethereum/core/types" + "github.com/onflow/go-ethereum/eth/tracers" "github.com/rs/zerolog" "github.com/sethvargo/go-limiter/memorystore" grpcOpts "google.golang.org/grpc" @@ -25,6 +27,15 @@ import ( "github.com/onflow/flow-evm-gateway/services/traces" "github.com/onflow/flow-evm-gateway/storage" "github.com/onflow/flow-evm-gateway/storage/pebble" + + // this import is needed for side-effects, because the + // tracers.DefaultDirectory is relying on the init function + _ "github.com/onflow/go-ethereum/eth/tracers/native" +) + +const ( + callTracerConfig = `{ "onlyTopCall": true }` + callTracerName = "callTracer" ) type Storages struct { @@ -124,9 +135,24 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error { b.logger, ) + tracer, err := tracers.DefaultDirectory.New( + callTracerName, + &tracers.Context{}, + json.RawMessage(callTracerConfig), + ) + if err != nil { + return err + } + blocksProvider := pebble.NewBlocksProvider( + b.storages.Blocks, + b.config.FlowNetworkID, + tracer, + ) + // initialize event ingestion engine b.events = ingestion.NewEventIngestionEngine( subscriber, + blocksProvider, b.storages.Storage, b.storages.Blocks, b.storages.Receipts, diff --git a/services/ingestion/engine.go b/services/ingestion/engine.go index 37aeae0b..4a89dfa7 100644 --- a/services/ingestion/engine.go +++ b/services/ingestion/engine.go @@ -35,6 +35,7 @@ type Engine struct { *models.EngineStatus subscriber EventSubscriber + blocksProvider *pebble.BlocksProvider store *pebble.Storage blocks storage.BlockIndexer receipts storage.ReceiptIndexer @@ -49,6 +50,7 @@ type Engine struct { func NewEventIngestionEngine( subscriber EventSubscriber, + blocksProvider *pebble.BlocksProvider, store *pebble.Storage, blocks storage.BlockIndexer, receipts storage.ReceiptIndexer, @@ -65,6 +67,7 @@ func NewEventIngestionEngine( EngineStatus: models.NewEngineStatus(), subscriber: subscriber, + blocksProvider: blocksProvider, store: store, blocks: blocks, receipts: receipts, @@ -185,6 +188,22 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error { return fmt.Errorf("failed to index receipts for block %d event: %w", events.Block().Height, err) } + if err := e.blocksProvider.OnBlockReceived(events.Block()); err != nil { + return fmt.Errorf( + "failed to call OnBlockReceived for block %d, with: %w", + events.Block().Height, + err, + ) + } + + if err := e.blocksProvider.OnBlockExecuted(events.Block().Height, &pebble.ResultsCollector{}); err != nil { + return fmt.Errorf( + "failed to call OnBlockExecuted for block %d, with: %w", + events.Block().Height, + err, + ) + } + if err := batch.Commit(pebbleDB.Sync); err != nil { return fmt.Errorf("failed to commit indexed data for Cadence block %d: %w", events.CadenceHeight(), err) } diff --git a/services/ingestion/engine_test.go b/services/ingestion/engine_test.go index c7f6a77b..22f4408f 100644 --- a/services/ingestion/engine_test.go +++ b/services/ingestion/engine_test.go @@ -63,6 +63,7 @@ func TestSerialBlockIngestion(t *testing.T) { engine := NewEventIngestionEngine( subscriber, + pebble.NewBlocksProvider(blocks, flowGo.Emulator, nil), store, blocks, receipts, @@ -143,6 +144,7 @@ func TestSerialBlockIngestion(t *testing.T) { engine := NewEventIngestionEngine( subscriber, + pebble.NewBlocksProvider(blocks, flowGo.Emulator, nil), store, blocks, receipts, @@ -258,6 +260,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { engine := NewEventIngestionEngine( subscriber, + pebble.NewBlocksProvider(blocks, flowGo.Emulator, nil), store, blocks, receipts, @@ -361,6 +364,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { engine := NewEventIngestionEngine( subscriber, + pebble.NewBlocksProvider(blocks, flowGo.Emulator, nil), store, blocks, receipts, @@ -457,6 +461,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { engine := NewEventIngestionEngine( subscriber, + pebble.NewBlocksProvider(blocks, flowGo.Emulator, nil), store, blocks, receipts, diff --git a/storage/pebble/blocks_provider.go b/storage/pebble/blocks_provider.go new file mode 100644 index 00000000..e4689633 --- /dev/null +++ b/storage/pebble/blocks_provider.go @@ -0,0 +1,144 @@ +package pebble + +import ( + "fmt" + + "github.com/onflow/flow-evm-gateway/models" + "github.com/onflow/flow-evm-gateway/storage" + evmTypes "github.com/onflow/flow-go/fvm/evm/types" + flowGo "github.com/onflow/flow-go/model/flow" + gethCommon "github.com/onflow/go-ethereum/common" + "github.com/onflow/go-ethereum/eth/tracers" +) + +type ResultsCollector struct{} + +func (s *ResultsCollector) StorageRegisterUpdates() map[flowGo.RegisterID]flowGo.RegisterValue { + return map[flowGo.RegisterID]flowGo.RegisterValue{} +} + +var _ evmTypes.ReplayResultCollector = (*ResultsCollector)(nil) + +type BlocksProvider struct { + blocks storage.BlockIndexer + chainID flowGo.ChainID + tracer *tracers.Tracer + latestBlock *models.Block +} + +var _ evmTypes.BlockSnapshotProvider = (*BlocksProvider)(nil) +var _ evmTypes.BlockSnapshot = (*BlocksProvider)(nil) + +func NewBlocksProvider( + blocks storage.BlockIndexer, + chainID flowGo.ChainID, + tracer *tracers.Tracer, +) *BlocksProvider { + return &BlocksProvider{ + blocks: blocks, + chainID: chainID, + tracer: tracer, + } +} + +func (bp *BlocksProvider) OnBlockReceived(block *models.Block) error { + if bp.latestBlock != nil { + return fmt.Errorf( + "received new block: %d, while still processing latest block: %d", + block.Height, + bp.latestBlock.Height, + ) + } + + bp.latestBlock = block + + return nil +} + +func (bp *BlocksProvider) OnBlockExecuted( + height uint64, + resCol evmTypes.ReplayResultCollector, +) error { + if bp.latestBlock == nil { + return fmt.Errorf( + "received block execution for: %d without latest block", + height, + ) + } + + if bp.latestBlock.Height != height { + return fmt.Errorf( + "latest block height doesn't match expected: %d, got: %d", + bp.latestBlock.Height, + height, + ) + } + + bp.latestBlock = nil + + return nil +} + +func (bp *BlocksProvider) GetSnapshotAt(height uint64) ( + evmTypes.BlockSnapshot, + error, +) { + if bp.latestBlock != nil { + return bp, nil + } + + block, err := bp.blocks.GetByHeight(height) + if err != nil { + return nil, err + } + + return &BlocksProvider{ + blocks: bp.blocks, + chainID: bp.chainID, + tracer: bp.tracer, + latestBlock: block, + }, nil +} + +func (bs *BlocksProvider) BlockContext() (evmTypes.BlockContext, error) { + if bs.latestBlock == nil { + return evmTypes.BlockContext{}, fmt.Errorf( + "cannot create block context without latest block", + ) + } + + block := bs.latestBlock + return evmTypes.BlockContext{ + ChainID: evmTypes.EVMChainIDFromFlowChainID(bs.chainID), + BlockNumber: block.Height, + BlockTimestamp: block.Timestamp, + DirectCallBaseGasUsage: evmTypes.DefaultDirectCallBaseGasUsage, + DirectCallGasPrice: evmTypes.DefaultDirectCallGasPrice, + GasFeeCollector: evmTypes.CoinbaseAddress, + GetHashFunc: func(n uint64) gethCommon.Hash { + // For block heights greater than or equal to the current, + // return an empty block hash. + if n >= block.Height { + return gethCommon.Hash{} + } + // If the given block height, is more than 256 blocks + // in the past, return an empty block hash. + if block.Height-n > 256 { + return gethCommon.Hash{} + } + + block, err := bs.blocks.GetByHeight(n) + if err != nil { + return gethCommon.Hash{} + } + blockHash, err := block.Hash() + if err != nil { + return gethCommon.Hash{} + } + + return blockHash + }, + Random: block.PrevRandao, + Tracer: bs.tracer, + }, nil +}