diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index 71539a72a6e..6a9cfdce56d 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -358,7 +358,8 @@ type FlowAccessNodeBuilder struct { stateStreamBackend *statestreambackend.StateStreamBackend nodeBackend *backend.Backend - TxResultErrorMessagesCore *tx_error_messages.TxErrorMessagesCore + ExecNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider + TxResultErrorMessagesCore *tx_error_messages.TxErrorMessagesCore } func (builder *FlowAccessNodeBuilder) buildFollowerState() *FlowAccessNodeBuilder { @@ -1970,33 +1971,49 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { } + preferredENIdentifiers, err := commonrpc.IdentifierList(backendConfig.PreferredExecutionNodeIDs) + if err != nil { + return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err) + } + + fixedENIdentifiers, err := commonrpc.IdentifierList(backendConfig.FixedExecutionNodeIDs) + if err != nil { + return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err) + } + + builder.ExecNodeIdentitiesProvider = commonrpc.NewExecutionNodeIdentitiesProvider( + node.Logger, + node.State, + node.Storage.Receipts, + preferredENIdentifiers, + fixedENIdentifiers, + ) + builder.nodeBackend, err = backend.New(backend.Params{ - State: node.State, - CollectionRPC: builder.CollectionRPC, - HistoricalAccessNodes: builder.HistoricalAccessRPCs, - Blocks: node.Storage.Blocks, - Headers: node.Storage.Headers, - Collections: node.Storage.Collections, - Transactions: node.Storage.Transactions, - ExecutionReceipts: node.Storage.Receipts, - ExecutionResults: node.Storage.Results, - TxResultErrorMessages: node.Storage.TransactionResultErrorMessages, - ChainID: node.RootChainID, - AccessMetrics: builder.AccessMetrics, - ConnFactory: connFactory, - RetryEnabled: builder.retryEnabled, - MaxHeightRange: backendConfig.MaxHeightRange, - PreferredExecutionNodeIDs: backendConfig.PreferredExecutionNodeIDs, - FixedExecutionNodeIDs: backendConfig.FixedExecutionNodeIDs, - Log: node.Logger, - SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, - Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled), - TxResultCacheSize: builder.TxResultCacheSize, - ScriptExecutor: builder.ScriptExecutor, - ScriptExecutionMode: scriptExecMode, - CheckPayerBalanceMode: checkPayerBalanceMode, - EventQueryMode: eventQueryMode, - BlockTracker: blockTracker, + State: node.State, + CollectionRPC: builder.CollectionRPC, + HistoricalAccessNodes: builder.HistoricalAccessRPCs, + Blocks: node.Storage.Blocks, + Headers: node.Storage.Headers, + Collections: node.Storage.Collections, + Transactions: node.Storage.Transactions, + ExecutionReceipts: node.Storage.Receipts, + ExecutionResults: node.Storage.Results, + TxResultErrorMessages: node.Storage.TransactionResultErrorMessages, + ChainID: node.RootChainID, + AccessMetrics: builder.AccessMetrics, + ConnFactory: connFactory, + RetryEnabled: builder.retryEnabled, + MaxHeightRange: backendConfig.MaxHeightRange, + Log: node.Logger, + SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, + Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled), + TxResultCacheSize: builder.TxResultCacheSize, + ScriptExecutor: builder.ScriptExecutor, + ScriptExecutionMode: scriptExecMode, + CheckPayerBalanceMode: checkPayerBalanceMode, + EventQueryMode: eventQueryMode, + BlockTracker: blockTracker, SubscriptionHandler: subscription.NewSubscriptionHandler( builder.Logger, broadcaster, @@ -2004,12 +2021,13 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { builder.stateStreamConf.ResponseLimit, builder.stateStreamConf.ClientSendBufferSize, ), - EventsIndex: builder.EventsIndex, - TxResultQueryMode: txResultQueryMode, - TxResultsIndex: builder.TxResultsIndex, - LastFullBlockHeight: lastFullBlockHeight, - IndexReporter: indexReporter, - VersionControl: builder.VersionControl, + EventsIndex: builder.EventsIndex, + TxResultQueryMode: txResultQueryMode, + TxResultsIndex: builder.TxResultsIndex, + LastFullBlockHeight: lastFullBlockHeight, + IndexReporter: indexReporter, + VersionControl: builder.VersionControl, + ExecNodeIdentitiesProvider: builder.ExecNodeIdentitiesProvider, }) if err != nil { return nil, fmt.Errorf("could not initialize backend: %w", err) @@ -2063,25 +2081,12 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { return nil, fmt.Errorf("could not create requester engine: %w", err) } - preferredENIdentifiers, err := commonrpc.IdentifierList(builder.rpcConf.BackendConfig.PreferredExecutionNodeIDs) - if err != nil { - return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err) - } - - fixedENIdentifiers, err := commonrpc.IdentifierList(builder.rpcConf.BackendConfig.FixedExecutionNodeIDs) - if err != nil { - return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err) - } - if builder.storeTxResultErrorMessages { builder.TxResultErrorMessagesCore = tx_error_messages.NewTxErrorMessagesCore( node.Logger, - node.State, builder.nodeBackend, - node.Storage.Receipts, node.Storage.TransactionResultErrorMessages, - preferredENIdentifiers, - fixedENIdentifiers, + builder.ExecNodeIdentitiesProvider, ) } diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index ea3b9a1654a..98fc1fc701a 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -51,6 +51,7 @@ import ( statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend" "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/engine/common/follower" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/engine/common/stop" synceng "github.com/onflow/flow-go/engine/common/synchronization" "github.com/onflow/flow-go/engine/common/version" @@ -1869,25 +1870,41 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { indexReporter = builder.Reporter } + preferredENIdentifiers, err := commonrpc.IdentifierList(backendConfig.PreferredExecutionNodeIDs) + if err != nil { + return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err) + } + + fixedENIdentifiers, err := commonrpc.IdentifierList(backendConfig.FixedExecutionNodeIDs) + if err != nil { + return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err) + } + + execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + node.Logger, + node.State, + node.Storage.Receipts, + preferredENIdentifiers, + fixedENIdentifiers, + ) + backendParams := backend.Params{ - State: node.State, - Blocks: node.Storage.Blocks, - Headers: node.Storage.Headers, - Collections: node.Storage.Collections, - Transactions: node.Storage.Transactions, - ExecutionReceipts: node.Storage.Receipts, - ExecutionResults: node.Storage.Results, - ChainID: node.RootChainID, - AccessMetrics: accessMetrics, - ConnFactory: connFactory, - RetryEnabled: false, - MaxHeightRange: backendConfig.MaxHeightRange, - PreferredExecutionNodeIDs: backendConfig.PreferredExecutionNodeIDs, - FixedExecutionNodeIDs: backendConfig.FixedExecutionNodeIDs, - Log: node.Logger, - SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, - Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled), - BlockTracker: blockTracker, + State: node.State, + Blocks: node.Storage.Blocks, + Headers: node.Storage.Headers, + Collections: node.Storage.Collections, + Transactions: node.Storage.Transactions, + ExecutionReceipts: node.Storage.Receipts, + ExecutionResults: node.Storage.Results, + ChainID: node.RootChainID, + AccessMetrics: accessMetrics, + ConnFactory: connFactory, + RetryEnabled: false, + MaxHeightRange: backendConfig.MaxHeightRange, + Log: node.Logger, + SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, + Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled), + BlockTracker: blockTracker, SubscriptionHandler: subscription.NewSubscriptionHandler( builder.Logger, broadcaster, @@ -1895,8 +1912,9 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { builder.stateStreamConf.ResponseLimit, builder.stateStreamConf.ClientSendBufferSize, ), - IndexReporter: indexReporter, - VersionControl: builder.VersionControl, + IndexReporter: indexReporter, + VersionControl: builder.VersionControl, + ExecNodeIdentitiesProvider: execNodeIdentitiesProvider, } if builder.localServiceAPIEnabled { diff --git a/engine/access/access_test.go b/engine/access/access_test.go index db0e712a6aa..d170deb7afe 100644 --- a/engine/access/access_test.go +++ b/engine/access/access_test.go @@ -26,6 +26,7 @@ import ( "github.com/onflow/flow-go/engine/access/rpc/backend" connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" "github.com/onflow/flow-go/engine/access/subscription" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/factory" @@ -644,23 +645,32 @@ func (suite *Suite) TestGetSealedTransaction() { blockTransactions, err := stdmap.NewIdentifierMap(100) require.NoError(suite.T(), err) - bnd, err := backend.New(backend.Params{State: suite.state, - CollectionRPC: suite.collClient, - Blocks: all.Blocks, - Headers: all.Headers, - Collections: collections, - Transactions: transactions, - ExecutionReceipts: receipts, - ExecutionResults: results, - ChainID: suite.chainID, - AccessMetrics: suite.metrics, - ConnFactory: connFactory, - MaxHeightRange: backend.DefaultMaxHeightRange, - PreferredExecutionNodeIDs: enNodeIDs.Strings(), - Log: suite.log, - SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, - Communicator: backend.NewNodeCommunicator(false), - TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly, + execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + suite.log, + suite.state, + receipts, + enNodeIDs, + nil, + ) + + bnd, err := backend.New(backend.Params{ + State: suite.state, + CollectionRPC: suite.collClient, + Blocks: all.Blocks, + Headers: all.Headers, + Collections: collections, + Transactions: transactions, + ExecutionReceipts: receipts, + ExecutionResults: results, + ChainID: suite.chainID, + AccessMetrics: suite.metrics, + ConnFactory: connFactory, + MaxHeightRange: backend.DefaultMaxHeightRange, + Log: suite.log, + SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, + Communicator: backend.NewNodeCommunicator(false), + TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly, + ExecNodeIdentitiesProvider: execNodeIdentitiesProvider, }) require.NoError(suite.T(), err) @@ -826,23 +836,31 @@ func (suite *Suite) TestGetTransactionResult() { blockTransactions, err := stdmap.NewIdentifierMap(100) require.NoError(suite.T(), err) + execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + suite.log, + suite.state, + receipts, + enNodeIDs, + nil, + ) + bnd, err := backend.New(backend.Params{State: suite.state, - CollectionRPC: suite.collClient, - Blocks: all.Blocks, - Headers: all.Headers, - Collections: collections, - Transactions: transactions, - ExecutionReceipts: receipts, - ExecutionResults: results, - ChainID: suite.chainID, - AccessMetrics: suite.metrics, - ConnFactory: connFactory, - MaxHeightRange: backend.DefaultMaxHeightRange, - PreferredExecutionNodeIDs: enNodeIDs.Strings(), - Log: suite.log, - SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, - Communicator: backend.NewNodeCommunicator(false), - TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly, + CollectionRPC: suite.collClient, + Blocks: all.Blocks, + Headers: all.Headers, + Collections: collections, + Transactions: transactions, + ExecutionReceipts: receipts, + ExecutionResults: results, + ChainID: suite.chainID, + AccessMetrics: suite.metrics, + ConnFactory: connFactory, + MaxHeightRange: backend.DefaultMaxHeightRange, + Log: suite.log, + SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, + Communicator: backend.NewNodeCommunicator(false), + TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly, + ExecNodeIdentitiesProvider: execNodeIdentitiesProvider, }) require.NoError(suite.T(), err) @@ -1056,26 +1074,34 @@ func (suite *Suite) TestExecuteScript() { connFactory := connectionmock.NewConnectionFactory(suite.T()) connFactory.On("GetExecutionAPIClient", mock.Anything).Return(suite.execClient, &mockCloser{}, nil) + execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + suite.log, + suite.state, + receipts, + nil, + identities.NodeIDs(), + ) + var err error suite.backend, err = backend.New(backend.Params{ - State: suite.state, - CollectionRPC: suite.collClient, - Blocks: all.Blocks, - Headers: all.Headers, - Collections: collections, - Transactions: transactions, - ExecutionReceipts: receipts, - ExecutionResults: results, - ChainID: suite.chainID, - AccessMetrics: suite.metrics, - ConnFactory: connFactory, - MaxHeightRange: backend.DefaultMaxHeightRange, - FixedExecutionNodeIDs: (identities.NodeIDs()).Strings(), - Log: suite.log, - SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, - Communicator: backend.NewNodeCommunicator(false), - ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly, - TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly, + State: suite.state, + CollectionRPC: suite.collClient, + Blocks: all.Blocks, + Headers: all.Headers, + Collections: collections, + Transactions: transactions, + ExecutionReceipts: receipts, + ExecutionResults: results, + ChainID: suite.chainID, + AccessMetrics: suite.metrics, + ConnFactory: connFactory, + MaxHeightRange: backend.DefaultMaxHeightRange, + Log: suite.log, + SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, + Communicator: backend.NewNodeCommunicator(false), + ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly, + TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly, + ExecNodeIdentitiesProvider: execNodeIdentitiesProvider, }) require.NoError(suite.T(), err) diff --git a/engine/access/ingestion/tx_error_messages/tx_error_messages_core.go b/engine/access/ingestion/tx_error_messages/tx_error_messages_core.go index 88f3b93ec5d..f41a49ab409 100644 --- a/engine/access/ingestion/tx_error_messages/tx_error_messages_core.go +++ b/engine/access/ingestion/tx_error_messages/tx_error_messages_core.go @@ -4,51 +4,40 @@ import ( "context" "fmt" - execproto "github.com/onflow/flow/protobuf/go/flow/execution" "github.com/rs/zerolog" "github.com/onflow/flow-go/engine/access/rpc/backend" commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" + + execproto "github.com/onflow/flow/protobuf/go/flow/execution" ) // TxErrorMessagesCore is responsible for managing transaction result error messages // It handles both storage and retrieval of error messages // from execution nodes. type TxErrorMessagesCore struct { - log zerolog.Logger // used to log relevant actions with context - state protocol.State // used to access the protocol state + log zerolog.Logger // used to log relevant actions with context - backend *backend.Backend - - executionReceipts storage.ExecutionReceipts + backend *backend.Backend transactionResultErrorMessages storage.TransactionResultErrorMessages - - preferredExecutionNodeIDs flow.IdentifierList - fixedExecutionNodeIDs flow.IdentifierList + execNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider } // NewTxErrorMessagesCore creates a new instance of TxErrorMessagesCore. func NewTxErrorMessagesCore( log zerolog.Logger, - state protocol.State, backend *backend.Backend, - executionReceipts storage.ExecutionReceipts, transactionResultErrorMessages storage.TransactionResultErrorMessages, - preferredExecutionNodeIDs flow.IdentifierList, - fixedExecutionNodeIDs flow.IdentifierList, + execNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider, ) *TxErrorMessagesCore { return &TxErrorMessagesCore{ log: log.With().Str("module", "tx_error_messages_core").Logger(), - state: state, backend: backend, - executionReceipts: executionReceipts, transactionResultErrorMessages: transactionResultErrorMessages, - preferredExecutionNodeIDs: preferredExecutionNodeIDs, - fixedExecutionNodeIDs: fixedExecutionNodeIDs, + execNodeIdentitiesProvider: execNodeIdentitiesProvider, } } @@ -74,15 +63,7 @@ func (c *TxErrorMessagesCore) HandleTransactionResultErrorMessages(ctx context.C } // retrieves error messages from the backend if they do not already exist in storage - execNodes, err := commonrpc.ExecutionNodesForBlockID( - ctx, - blockID, - c.executionReceipts, - c.state, - c.log, - c.preferredExecutionNodeIDs, - c.fixedExecutionNodeIDs, - ) + execNodes, err := c.execNodeIdentitiesProvider.ExecutionNodesForBlockID(ctx, blockID) if err != nil { c.log.Error().Err(err).Msg(fmt.Sprintf("failed to find execution nodes for block id: %s", blockID)) return fmt.Errorf("could not find execution nodes for block: %w", err) diff --git a/engine/access/ingestion/tx_error_messages/tx_error_messages_core_test.go b/engine/access/ingestion/tx_error_messages/tx_error_messages_core_test.go index 9d5d6466d97..aa8a51b42f8 100644 --- a/engine/access/ingestion/tx_error_messages/tx_error_messages_core_test.go +++ b/engine/access/ingestion/tx_error_messages/tx_error_messages_core_test.go @@ -15,6 +15,7 @@ import ( accessmock "github.com/onflow/flow-go/engine/access/mock" "github.com/onflow/flow-go/engine/access/rpc/backend" connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/irrecoverable" protocol "github.com/onflow/flow-go/state/protocol/mock" @@ -230,30 +231,35 @@ func (s *TxErrorMessagesCoreSuite) TestHandleTransactionResultErrorMessages_Erro // initCore create new instance of transaction error messages core. func (s *TxErrorMessagesCoreSuite) initCore() *TxErrorMessagesCore { + execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + s.log, + s.proto.state, + s.receipts, + flow.IdentifierList{}, + s.enNodeIDs.NodeIDs(), + ) + // Initialize the backend backend, err := backend.New(backend.Params{ - State: s.proto.state, - ExecutionReceipts: s.receipts, - ConnFactory: s.connFactory, - MaxHeightRange: backend.DefaultMaxHeightRange, - FixedExecutionNodeIDs: s.enNodeIDs.NodeIDs().Strings(), - Log: s.log, - SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, - Communicator: backend.NewNodeCommunicator(false), - ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly, - TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly, - ChainID: flow.Testnet, + State: s.proto.state, + ExecutionReceipts: s.receipts, + ConnFactory: s.connFactory, + MaxHeightRange: backend.DefaultMaxHeightRange, + Log: s.log, + SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, + Communicator: backend.NewNodeCommunicator(false), + ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly, + TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly, + ChainID: flow.Testnet, + ExecNodeIdentitiesProvider: execNodeIdentitiesProvider, }) require.NoError(s.T(), err) core := NewTxErrorMessagesCore( s.log, - s.proto.state, backend, - s.receipts, s.txErrorMessages, - s.enNodeIDs.NodeIDs(), - nil, + execNodeIdentitiesProvider, ) return core } diff --git a/engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go b/engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go index 44980d7c79f..b1edcffb9a5 100644 --- a/engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go +++ b/engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go @@ -18,6 +18,7 @@ import ( accessmock "github.com/onflow/flow-go/engine/access/mock" "github.com/onflow/flow-go/engine/access/rpc/backend" connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/irrecoverable" @@ -135,31 +136,36 @@ func (s *TxErrorMessagesEngineSuite) initEngine(ctx irrecoverable.SignalerContex module.ConsumeProgressEngineTxErrorMessagesBlockHeight, ) + execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + s.log, + s.proto.state, + s.receipts, + s.enNodeIDs.NodeIDs(), + flow.IdentifierList{}, + ) + // Initialize the backend with the mocked state, blocks, headers, transactions, etc. backend, err := backend.New(backend.Params{ - State: s.proto.state, - Headers: s.headers, - ExecutionReceipts: s.receipts, - ConnFactory: s.connFactory, - MaxHeightRange: backend.DefaultMaxHeightRange, - FixedExecutionNodeIDs: s.enNodeIDs.NodeIDs().Strings(), - Log: s.log, - SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, - Communicator: backend.NewNodeCommunicator(false), - ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly, - TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly, - ChainID: flow.Testnet, + State: s.proto.state, + Headers: s.headers, + ExecutionReceipts: s.receipts, + ConnFactory: s.connFactory, + MaxHeightRange: backend.DefaultMaxHeightRange, + Log: s.log, + SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, + Communicator: backend.NewNodeCommunicator(false), + ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly, + TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly, + ChainID: flow.Testnet, + ExecNodeIdentitiesProvider: execNodeIdentitiesProvider, }) require.NoError(s.T(), err) txResultErrorMessagesCore := NewTxErrorMessagesCore( s.log, - s.proto.state, backend, - s.receipts, s.txErrorMessages, - s.enNodeIDs.NodeIDs(), - nil, + execNodeIdentitiesProvider, ) eng, err := New( diff --git a/engine/access/rpc/backend/backend.go b/engine/access/rpc/backend/backend.go index a48f335670d..d4666af9529 100644 --- a/engine/access/rpc/backend/backend.go +++ b/engine/access/rpc/backend/backend.go @@ -43,11 +43,6 @@ const DefaultLoggedScriptsCacheSize = 1_000_000 // DefaultConnectionPoolSize is the default size for the connection pool to collection and execution nodes const DefaultConnectionPoolSize = 250 -var ( - preferredENIdentifiers flow.IdentifierList - fixedENIdentifiers flow.IdentifierList -) - // Backend implements the Access API. // // It is composed of several sub-backends that implement part of the Access API. @@ -84,40 +79,39 @@ type Backend struct { } type Params struct { - State protocol.State - CollectionRPC accessproto.AccessAPIClient - HistoricalAccessNodes []accessproto.AccessAPIClient - Blocks storage.Blocks - Headers storage.Headers - Collections storage.Collections - Transactions storage.Transactions - ExecutionReceipts storage.ExecutionReceipts - ExecutionResults storage.ExecutionResults - TxResultErrorMessages storage.TransactionResultErrorMessages - ChainID flow.ChainID - AccessMetrics module.AccessMetrics - ConnFactory connection.ConnectionFactory - RetryEnabled bool - MaxHeightRange uint - PreferredExecutionNodeIDs []string - FixedExecutionNodeIDs []string - Log zerolog.Logger - SnapshotHistoryLimit int - Communicator Communicator - TxResultCacheSize uint - ScriptExecutor execution.ScriptExecutor - ScriptExecutionMode IndexQueryMode - CheckPayerBalanceMode access.PayerBalanceMode - EventQueryMode IndexQueryMode - BlockTracker subscription.BlockTracker - SubscriptionHandler *subscription.SubscriptionHandler - - EventsIndex *index.EventsIndex - TxResultQueryMode IndexQueryMode - TxResultsIndex *index.TransactionResultsIndex - LastFullBlockHeight *counters.PersistentStrictMonotonicCounter - IndexReporter state_synchronization.IndexReporter - VersionControl *version.VersionControl + State protocol.State + CollectionRPC accessproto.AccessAPIClient + HistoricalAccessNodes []accessproto.AccessAPIClient + Blocks storage.Blocks + Headers storage.Headers + Collections storage.Collections + Transactions storage.Transactions + ExecutionReceipts storage.ExecutionReceipts + ExecutionResults storage.ExecutionResults + TxResultErrorMessages storage.TransactionResultErrorMessages + ChainID flow.ChainID + AccessMetrics module.AccessMetrics + ConnFactory connection.ConnectionFactory + RetryEnabled bool + MaxHeightRange uint + Log zerolog.Logger + SnapshotHistoryLimit int + Communicator Communicator + TxResultCacheSize uint + ScriptExecutor execution.ScriptExecutor + ScriptExecutionMode IndexQueryMode + CheckPayerBalanceMode access.PayerBalanceMode + EventQueryMode IndexQueryMode + BlockTracker subscription.BlockTracker + SubscriptionHandler *subscription.SubscriptionHandler + + EventsIndex *index.EventsIndex + TxResultQueryMode IndexQueryMode + TxResultsIndex *index.TransactionResultsIndex + LastFullBlockHeight *counters.PersistentStrictMonotonicCounter + IndexReporter state_synchronization.IndexReporter + VersionControl *version.VersionControl + ExecNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider } var _ TransactionErrorMessage = (*Backend)(nil) @@ -164,28 +158,28 @@ func New(params Params) (*Backend, error) { BlockTracker: params.BlockTracker, // create the sub-backends backendScripts: backendScripts{ - log: params.Log, - headers: params.Headers, - executionReceipts: params.ExecutionReceipts, - connFactory: params.ConnFactory, - state: params.State, - metrics: params.AccessMetrics, - loggedScripts: loggedScripts, - nodeCommunicator: params.Communicator, - scriptExecutor: params.ScriptExecutor, - scriptExecMode: params.ScriptExecutionMode, + log: params.Log, + headers: params.Headers, + connFactory: params.ConnFactory, + state: params.State, + metrics: params.AccessMetrics, + loggedScripts: loggedScripts, + nodeCommunicator: params.Communicator, + scriptExecutor: params.ScriptExecutor, + scriptExecMode: params.ScriptExecutionMode, + execNodeIdentitiesProvider: params.ExecNodeIdentitiesProvider, }, backendEvents: backendEvents{ - log: params.Log, - chain: params.ChainID.Chain(), - state: params.State, - headers: params.Headers, - executionReceipts: params.ExecutionReceipts, - connFactory: params.ConnFactory, - maxHeightRange: params.MaxHeightRange, - nodeCommunicator: params.Communicator, - queryMode: params.EventQueryMode, - eventsIndex: params.EventsIndex, + log: params.Log, + chain: params.ChainID.Chain(), + state: params.State, + headers: params.Headers, + connFactory: params.ConnFactory, + maxHeightRange: params.MaxHeightRange, + nodeCommunicator: params.Communicator, + queryMode: params.EventQueryMode, + eventsIndex: params.EventsIndex, + execNodeIdentitiesProvider: params.ExecNodeIdentitiesProvider, }, backendBlockHeaders: backendBlockHeaders{ headers: params.Headers, @@ -196,14 +190,14 @@ func New(params Params) (*Backend, error) { state: params.State, }, backendAccounts: backendAccounts{ - log: params.Log, - state: params.State, - headers: params.Headers, - executionReceipts: params.ExecutionReceipts, - connFactory: params.ConnFactory, - nodeCommunicator: params.Communicator, - scriptExecutor: params.ScriptExecutor, - scriptExecMode: params.ScriptExecutionMode, + log: params.Log, + state: params.State, + headers: params.Headers, + connFactory: params.ConnFactory, + nodeCommunicator: params.Communicator, + scriptExecutor: params.ScriptExecutor, + scriptExecMode: params.ScriptExecutionMode, + execNodeIdentitiesProvider: params.ExecNodeIdentitiesProvider, }, backendExecutionResults: backendExecutionResults{ executionResults: params.ExecutionResults, @@ -242,7 +236,6 @@ func New(params Params) (*Backend, error) { staticCollectionRPC: params.CollectionRPC, chainID: params.ChainID, transactions: params.Transactions, - executionReceipts: params.ExecutionReceipts, txResultErrorMessages: params.TxResultErrorMessages, transactionValidator: txValidator, transactionMetrics: params.AccessMetrics, @@ -254,6 +247,7 @@ func New(params Params) (*Backend, error) { txResultQueryMode: params.TxResultQueryMode, systemTx: systemTx, systemTxID: systemTxID, + execNodeIdentitiesProvider: params.ExecNodeIdentitiesProvider, } // TODO: The TransactionErrorMessage interface should be reorganized in future, as it is implemented in backendTransactions but used in TransactionsLocalDataProvider, and its initialization is somewhat quirky. @@ -270,16 +264,6 @@ func New(params Params) (*Backend, error) { retry.SetBackend(b) - preferredENIdentifiers, err = commonrpc.IdentifierList(params.PreferredExecutionNodeIDs) - if err != nil { - return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err) - } - - fixedENIdentifiers, err = commonrpc.IdentifierList(params.FixedExecutionNodeIDs) - if err != nil { - return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err) - } - return b, nil } diff --git a/engine/access/rpc/backend/backend_accounts.go b/engine/access/rpc/backend/backend_accounts.go index f3a38219a31..609d1c063a8 100644 --- a/engine/access/rpc/backend/backend_accounts.go +++ b/engine/access/rpc/backend/backend_accounts.go @@ -25,14 +25,14 @@ import ( ) type backendAccounts struct { - log zerolog.Logger - state protocol.State - headers storage.Headers - executionReceipts storage.ExecutionReceipts - connFactory connection.ConnectionFactory - nodeCommunicator Communicator - scriptExecutor execution.ScriptExecutor - scriptExecMode IndexQueryMode + log zerolog.Logger + state protocol.State + headers storage.Headers + connFactory connection.ConnectionFactory + nodeCommunicator Communicator + scriptExecutor execution.ScriptExecutor + scriptExecMode IndexQueryMode + execNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider } // GetAccount returns the account details at the latest sealed block. @@ -420,14 +420,9 @@ func (b *backendAccounts) getAccountFromAnyExeNode( BlockId: blockID[:], } - execNodes, err := commonrpc.ExecutionNodesForBlockID( + execNodes, err := b.execNodeIdentitiesProvider.ExecutionNodesForBlockID( ctx, blockID, - b.executionReceipts, - b.state, - b.log, - preferredENIdentifiers, - fixedENIdentifiers, ) if err != nil { return nil, rpc.ConvertError(err, "failed to find execution node to query", codes.Internal) diff --git a/engine/access/rpc/backend/backend_accounts_test.go b/engine/access/rpc/backend/backend_accounts_test.go index 9af885be191..86561eefb7a 100644 --- a/engine/access/rpc/backend/backend_accounts_test.go +++ b/engine/access/rpc/backend/backend_accounts_test.go @@ -13,6 +13,7 @@ import ( access "github.com/onflow/flow-go/engine/access/mock" connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" execmock "github.com/onflow/flow-go/module/execution/mock" @@ -77,12 +78,18 @@ func (s *BackendAccountsSuite) SetupTest() { func (s *BackendAccountsSuite) defaultBackend() *backendAccounts { return &backendAccounts{ - log: s.log, - state: s.state, - headers: s.headers, - executionReceipts: s.receipts, - connFactory: s.connectionFactory, - nodeCommunicator: NewNodeCommunicator(false), + log: s.log, + state: s.state, + headers: s.headers, + connFactory: s.connectionFactory, + nodeCommunicator: NewNodeCommunicator(false), + execNodeIdentitiesProvider: commonrpc.NewExecutionNodeIdentitiesProvider( + s.log, + s.state, + s.receipts, + flow.IdentifierList{}, + flow.IdentifierList{}, + ), } } diff --git a/engine/access/rpc/backend/backend_events.go b/engine/access/rpc/backend/backend_events.go index 9f762672996..56f3207e8b7 100644 --- a/engine/access/rpc/backend/backend_events.go +++ b/engine/access/rpc/backend/backend_events.go @@ -28,16 +28,16 @@ import ( ) type backendEvents struct { - headers storage.Headers - executionReceipts storage.ExecutionReceipts - state protocol.State - chain flow.Chain - connFactory connection.ConnectionFactory - log zerolog.Logger - maxHeightRange uint - nodeCommunicator Communicator - queryMode IndexQueryMode - eventsIndex *index.EventsIndex + headers storage.Headers + state protocol.State + chain flow.Chain + connFactory connection.ConnectionFactory + log zerolog.Logger + maxHeightRange uint + nodeCommunicator Communicator + queryMode IndexQueryMode + eventsIndex *index.EventsIndex + execNodeIdentitiesProvider *rpc.ExecutionNodeIdentitiesProvider } // blockMetadata is used to capture information about requested blocks to avoid repeated blockID @@ -303,13 +303,10 @@ func (b *backendEvents) getBlockEventsFromExecutionNode( // choose the last block ID to find the list of execution nodes lastBlockID := blockIDs[len(blockIDs)-1] - execNodes, err := rpc.ExecutionNodesForBlockID(ctx, + execNodes, err := b.execNodeIdentitiesProvider.ExecutionNodesForBlockID( + ctx, lastBlockID, - b.executionReceipts, - b.state, - b.log, - preferredENIdentifiers, - fixedENIdentifiers) + ) if err != nil { return nil, rpc.ConvertError(err, "failed to retrieve events from execution node", codes.Internal) } diff --git a/engine/access/rpc/backend/backend_events_test.go b/engine/access/rpc/backend/backend_events_test.go index 2a6afca4c26..9f28efa3fa8 100644 --- a/engine/access/rpc/backend/backend_events_test.go +++ b/engine/access/rpc/backend/backend_events_test.go @@ -21,6 +21,7 @@ import ( "github.com/onflow/flow-go/engine/access/index" access "github.com/onflow/flow-go/engine/access/mock" connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/irrecoverable" @@ -175,16 +176,22 @@ func (s *BackendEventsSuite) SetupTest() { func (s *BackendEventsSuite) defaultBackend() *backendEvents { return &backendEvents{ - log: s.log, - chain: s.chainID.Chain(), - state: s.state, - headers: s.headers, - executionReceipts: s.receipts, - connFactory: s.connectionFactory, - nodeCommunicator: NewNodeCommunicator(false), - maxHeightRange: DefaultMaxHeightRange, - queryMode: IndexQueryModeExecutionNodesOnly, - eventsIndex: s.eventsIndex, + log: s.log, + chain: s.chainID.Chain(), + state: s.state, + headers: s.headers, + connFactory: s.connectionFactory, + nodeCommunicator: NewNodeCommunicator(false), + maxHeightRange: DefaultMaxHeightRange, + queryMode: IndexQueryModeExecutionNodesOnly, + eventsIndex: s.eventsIndex, + execNodeIdentitiesProvider: commonrpc.NewExecutionNodeIdentitiesProvider( + s.log, + s.state, + s.receipts, + flow.IdentifierList{}, + flow.IdentifierList{}, + ), } } diff --git a/engine/access/rpc/backend/backend_scripts.go b/engine/access/rpc/backend/backend_scripts.go index 7d22f117912..b0237447d46 100644 --- a/engine/access/rpc/backend/backend_scripts.go +++ b/engine/access/rpc/backend/backend_scripts.go @@ -28,16 +28,16 @@ import ( const uniqueScriptLoggingTimeWindow = 10 * time.Minute type backendScripts struct { - log zerolog.Logger - headers storage.Headers - executionReceipts storage.ExecutionReceipts - state protocol.State - connFactory connection.ConnectionFactory - metrics module.BackendScriptsMetrics - loggedScripts *lru.Cache[[md5.Size]byte, time.Time] - nodeCommunicator Communicator - scriptExecutor execution.ScriptExecutor - scriptExecMode IndexQueryMode + log zerolog.Logger + headers storage.Headers + state protocol.State + connFactory connection.ConnectionFactory + metrics module.BackendScriptsMetrics + loggedScripts *lru.Cache[[md5.Size]byte, time.Time] + nodeCommunicator Communicator + scriptExecutor execution.ScriptExecutor + scriptExecMode IndexQueryMode + execNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider } // scriptExecutionRequest encapsulates the data needed to execute a script to make it easier @@ -225,7 +225,7 @@ func (b *backendScripts) executeScriptOnAvailableExecutionNodes( r *scriptExecutionRequest, ) ([]byte, time.Duration, error) { // find few execution nodes which have executed the block earlier and provided an execution receipt for it - executors, err := commonrpc.ExecutionNodesForBlockID(ctx, r.blockID, b.executionReceipts, b.state, b.log, preferredENIdentifiers, fixedENIdentifiers) + executors, err := b.execNodeIdentitiesProvider.ExecutionNodesForBlockID(ctx, r.blockID) if err != nil { return nil, 0, status.Errorf(codes.Internal, "failed to find script executors at blockId %v: %v", r.blockID.String(), err) } diff --git a/engine/access/rpc/backend/backend_scripts_test.go b/engine/access/rpc/backend/backend_scripts_test.go index 398d9ac33ec..e4b6ccbee7b 100644 --- a/engine/access/rpc/backend/backend_scripts_test.go +++ b/engine/access/rpc/backend/backend_scripts_test.go @@ -18,6 +18,7 @@ import ( access "github.com/onflow/flow-go/engine/access/mock" connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" fvmerrors "github.com/onflow/flow-go/fvm/errors" "github.com/onflow/flow-go/model/flow" execmock "github.com/onflow/flow-go/module/execution/mock" @@ -96,14 +97,20 @@ func (s *BackendScriptsSuite) defaultBackend() *backendScripts { s.Require().NoError(err) return &backendScripts{ - log: s.log, - metrics: metrics.NewNoopCollector(), - state: s.state, - headers: s.headers, - executionReceipts: s.receipts, - loggedScripts: loggedScripts, - connFactory: s.connectionFactory, - nodeCommunicator: NewNodeCommunicator(false), + log: s.log, + metrics: metrics.NewNoopCollector(), + state: s.state, + headers: s.headers, + loggedScripts: loggedScripts, + connFactory: s.connectionFactory, + nodeCommunicator: NewNodeCommunicator(false), + execNodeIdentitiesProvider: commonrpc.NewExecutionNodeIdentitiesProvider( + s.log, + s.state, + s.receipts, + flow.IdentifierList{}, + flow.IdentifierList{}, + ), } } diff --git a/engine/access/rpc/backend/backend_test.go b/engine/access/rpc/backend/backend_test.go index 617135ad1aa..8e15da70106 100644 --- a/engine/access/rpc/backend/backend_test.go +++ b/engine/access/rpc/backend/backend_test.go @@ -89,6 +89,9 @@ type Suite struct { chainID flow.ChainID systemTx *flow.TransactionBody + + fixedExecutionNodeIDs flow.IdentifierList + preferredExecutionNodeIDs flow.IdentifierList } func TestHandler(t *testing.T) { @@ -921,10 +924,11 @@ func (suite *Suite) TestGetTransactionResultByIndex() { Events: nil, } + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = (fixedENIDs.NodeIDs()).Strings() backend, err := New(params) suite.Require().NoError(err) @@ -962,7 +966,6 @@ func (suite *Suite) TestGetTransactionResultsByBlockID() { suite.state.On("Sealed").Return(suite.snapshot, nil).Maybe() ctx := context.Background() - params := suite.defaultBackendParams() block := unittest.BlockFixture() sporkRootBlockHeight := suite.state.Params().SporkRootBlockHeight() @@ -986,9 +989,11 @@ func (suite *Suite) TestGetTransactionResultsByBlockID() { TransactionResults: []*execproto.GetTransactionResultResponse{{}}, } + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + + params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = (fixedENIDs.NodeIDs()).Strings() backend, err := New(params) suite.Require().NoError(err) @@ -1076,10 +1081,11 @@ func (suite *Suite) TestTransactionStatusTransition() { Events: nil, } + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = (fixedENIDs.NodeIDs()).Strings() backend, err := New(params) suite.Require().NoError(err) @@ -1332,12 +1338,11 @@ func (suite *Suite) TestTransactionPendingToFinalizedStatusTransition() { params := suite.defaultBackendParams() params.ConnFactory = connFactory params.MaxHeightRange = TEST_MAX_HEIGHT + suite.preferredExecutionNodeIDs = flow.IdentifierList{receipts[0].ExecutorID} backend, err := New(params) suite.Require().NoError(err) - preferredENIdentifiers = flow.IdentifierList{receipts[0].ExecutorID} - // should return pending status when we have not observed collection for the transaction suite.Run("pending", func() { currentState = flow.TransactionStatusPending @@ -1472,10 +1477,11 @@ func (suite *Suite) TestGetExecutionResultByID() { Return(executionResult, nil) suite.Run("nonexisting execution result for id", func() { + suite.fixedExecutionNodeIDs = validENIDs + params := suite.defaultBackendParams() params.ExecutionResults = results params.ConnFactory = connFactory - params.FixedExecutionNodeIDs = validENIDs.Strings() backend, err := New(params) suite.Require().NoError(err) @@ -1487,10 +1493,11 @@ func (suite *Suite) TestGetExecutionResultByID() { }) suite.Run("existing execution result id", func() { + suite.fixedExecutionNodeIDs = validENIDs + params := suite.defaultBackendParams() params.ExecutionResults = results params.ConnFactory = connFactory - params.FixedExecutionNodeIDs = validENIDs.Strings() backend, err := New(params) suite.Require().NoError(err) @@ -1534,10 +1541,11 @@ func (suite *Suite) TestGetExecutionResultByBlockID() { Return(executionResult, nil) suite.Run("nonexisting execution results", func() { + suite.fixedExecutionNodeIDs = validENIDs + params := suite.defaultBackendParams() params.ExecutionResults = results params.ConnFactory = connFactory - params.FixedExecutionNodeIDs = validENIDs.Strings() backend, err := New(params) suite.Require().NoError(err) @@ -1549,10 +1557,11 @@ func (suite *Suite) TestGetExecutionResultByBlockID() { }) suite.Run("existing execution results", func() { + suite.fixedExecutionNodeIDs = validENIDs + params := suite.defaultBackendParams() params.ExecutionResults = results params.ConnFactory = connFactory - params.FixedExecutionNodeIDs = validENIDs.Strings() backend, err := New(params) suite.Require().NoError(err) @@ -1721,200 +1730,6 @@ func (suite *Suite) TestGetNetworkParameters() { suite.Require().Equal(expectedChainID, actual.ChainID) } -// TestExecutionNodesForBlockID tests the common method backend.ExecutionNodesForBlockID used for serving all API calls -// that need to talk to an execution node. -func (suite *Suite) TestExecutionNodesForBlockID() { - - totalReceipts := 5 - - block := unittest.BlockFixture() - - // generate one execution node identities for each receipt assuming that each ER is generated by a unique exec node - allExecutionNodes := unittest.IdentityListFixture(totalReceipts, unittest.WithRole(flow.RoleExecution)) - - // one execution result for all receipts for this block - executionResult := unittest.ExecutionResultFixture() - - // generate execution receipts - receipts := make(flow.ExecutionReceiptList, totalReceipts) - for j := 0; j < totalReceipts; j++ { - r := unittest.ReceiptForBlockFixture(&block) - r.ExecutorID = allExecutionNodes[j].NodeID - er := *executionResult - r.ExecutionResult = er - receipts[j] = r - } - - currentAttempt := 0 - attempt1Receipts, attempt2Receipts, attempt3Receipts := receipts, receipts, receipts - - // setup receipts storage mock to return different list of receipts on each call - suite.receipts. - On("ByBlockID", block.ID()).Return( - func(id flow.Identifier) flow.ExecutionReceiptList { - switch currentAttempt { - case 0: - currentAttempt++ - return attempt1Receipts - case 1: - currentAttempt++ - return attempt2Receipts - default: - currentAttempt = 0 - return attempt3Receipts - } - }, - func(id flow.Identifier) error { return nil }) - - suite.snapshot.On("Identities", mock.Anything).Return( - func(filter flow.IdentityFilter[flow.Identity]) flow.IdentityList { - // apply the filter passed in to the list of all the execution nodes - return allExecutionNodes.Filter(filter) - }, - func(flow.IdentityFilter[flow.Identity]) error { return nil }) - suite.state.On("Final").Return(suite.snapshot, nil).Maybe() - - testExecutionNodesForBlockID := func(preferredENs, fixedENs, expectedENs flow.IdentityList) { - - if preferredENs != nil { - preferredENIdentifiers = preferredENs.NodeIDs() - } - if fixedENs != nil { - fixedENIdentifiers = fixedENs.NodeIDs() - } - - if expectedENs == nil { - expectedENs = flow.IdentityList{} - } - - allExecNodes, err := commonrpc.ExecutionNodesForBlockID(context.Background(), block.ID(), suite.receipts, suite.state, suite.log, preferredENIdentifiers, fixedENIdentifiers) - require.NoError(suite.T(), err) - - execNodeSelectorFactory := NodeSelectorFactory{circuitBreakerEnabled: false} - execSelector, err := execNodeSelectorFactory.SelectNodes(allExecNodes) - require.NoError(suite.T(), err) - - actualList := flow.IdentitySkeletonList{} - for actual := execSelector.Next(); actual != nil; actual = execSelector.Next() { - actualList = append(actualList, actual) - } - - { - expectedENs := expectedENs.ToSkeleton() - if len(expectedENs) > commonrpc.MaxNodesCnt { - for _, actual := range actualList { - require.Contains(suite.T(), expectedENs, actual) - } - } else { - require.ElementsMatch(suite.T(), actualList, expectedENs) - } - } - } - // if we don't find sufficient receipts, ExecutionNodesForBlockID should return a list of random ENs - suite.Run("insufficient receipts return random ENs in State", func() { - // return no receipts at all attempts - attempt1Receipts = flow.ExecutionReceiptList{} - attempt2Receipts = flow.ExecutionReceiptList{} - attempt3Receipts = flow.ExecutionReceiptList{} - suite.state.On("AtBlockID", mock.Anything).Return(suite.snapshot) - - allExecNodes, err := commonrpc.ExecutionNodesForBlockID(context.Background(), block.ID(), suite.receipts, suite.state, suite.log, preferredENIdentifiers, - fixedENIdentifiers) - require.NoError(suite.T(), err) - - execNodeSelectorFactory := NodeSelectorFactory{circuitBreakerEnabled: false} - execSelector, err := execNodeSelectorFactory.SelectNodes(allExecNodes) - require.NoError(suite.T(), err) - - actualList := flow.IdentitySkeletonList{} - for actual := execSelector.Next(); actual != nil; actual = execSelector.Next() { - actualList = append(actualList, actual) - } - - require.Equal(suite.T(), len(actualList), commonrpc.MaxNodesCnt) - }) - - // if no preferred or fixed ENs are specified, the ExecutionNodesForBlockID function should - // return the exe node list without a filter - suite.Run("no preferred or fixed ENs", func() { - testExecutionNodesForBlockID(nil, nil, allExecutionNodes) - }) - // if only fixed ENs are specified, the ExecutionNodesForBlockID function should - // return the fixed ENs list - suite.Run("two fixed ENs with zero preferred EN", func() { - // mark the first two ENs as fixed - fixedENs := allExecutionNodes[0:2] - expectedList := fixedENs - testExecutionNodesForBlockID(nil, fixedENs, expectedList) - }) - // if only preferred ENs are specified, the ExecutionNodesForBlockID function should - // return the preferred ENs list - suite.Run("two preferred ENs with zero fixed EN", func() { - // mark the first two ENs as preferred - preferredENs := allExecutionNodes[0:2] - expectedList := allExecutionNodes[0:commonrpc.MaxNodesCnt] - testExecutionNodesForBlockID(preferredENs, nil, expectedList) - }) - // if both are specified, the ExecutionNodesForBlockID function should - // return the preferred ENs list - suite.Run("four fixed ENs of which two are preferred ENs", func() { - // mark the first four ENs as fixed - fixedENs := allExecutionNodes[0:5] - // mark the first two of the fixed ENs as preferred ENs - preferredENs := fixedENs[0:2] - expectedList := fixedENs[0:commonrpc.MaxNodesCnt] - testExecutionNodesForBlockID(preferredENs, fixedENs, expectedList) - }) - // if both are specified, but the preferred ENs don't match the ExecutorIDs in the ER, - // the ExecutionNodesForBlockID function should return the fixed ENs list - suite.Run("four fixed ENs of which two are preferred ENs but have not generated the ER", func() { - // mark the first two ENs as fixed - fixedENs := allExecutionNodes[0:2] - // specify two ENs not specified in the ERs as preferred - preferredENs := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution)) - // add one more node ID besides of the fixed ENs list cause expected length of the list should be maxNodesCnt - expectedList := append(fixedENs, allExecutionNodes[2]) - testExecutionNodesForBlockID(preferredENs, fixedENs, expectedList) - }) - // if execution receipts are not yet available, the ExecutionNodesForBlockID function should retry twice - suite.Run("retry execution receipt query", func() { - // on first attempt, no execution receipts are available - attempt1Receipts = flow.ExecutionReceiptList{} - // on second attempt ony one is available - attempt2Receipts = flow.ExecutionReceiptList{receipts[0]} - // on third attempt all receipts are available - attempt3Receipts = receipts - currentAttempt = 0 - // mark the first two ENs as preferred - preferredENs := allExecutionNodes[0:2] - expectedList := allExecutionNodes[0:commonrpc.MaxNodesCnt] - testExecutionNodesForBlockID(preferredENs, nil, expectedList) - }) - // if preferredENIdentifiers was set and there are less than maxNodesCnt nodes selected than check the order - // of adding ENs ids - suite.Run("add nodes in the correct order", func() { - // mark the first EN as preferred - preferredENIdentifiers = allExecutionNodes[0:1].NodeIDs() - // mark the fourth EN with receipt - executorIDs := allExecutionNodes[3:4].NodeIDs() - - receiptNodes := allExecutionNodes[3:4] // any EN with a receipt - preferredNodes := allExecutionNodes[0:1] // preferred EN node not already selected - additionalNode := allExecutionNodes[1:2] // any EN not already selected - - expectedOrder := flow.IdentityList{ - receiptNodes[0], - preferredNodes[0], - additionalNode[0], - } - - chosenIDs := commonrpc.ChooseFromPreferredENIDs(allExecutionNodes, executorIDs, preferredENIdentifiers) - - require.ElementsMatch(suite.T(), chosenIDs, expectedOrder) - require.Equal(suite.T(), len(chosenIDs), commonrpc.MaxNodesCnt) - }) -} - // TestGetTransactionResultEventEncodingVersion tests the GetTransactionResult function with different event encoding versions. func (suite *Suite) TestGetTransactionResultEventEncodingVersion() { suite.state.On("Sealed").Return(suite.snapshot, nil).Maybe() @@ -1957,10 +1772,11 @@ func (suite *Suite) TestGetTransactionResultEventEncodingVersion() { suite.state.On("Final").Return(suite.snapshot, nil).Maybe() suite.snapshot.On("Identities", mock.Anything).Return(fixedENIDs, nil) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = (fixedENIDs.NodeIDs()).Strings() backend, err := New(params) suite.Require().NoError(err) @@ -2019,10 +1835,11 @@ func (suite *Suite) TestGetTransactionResultByIndexAndBlockIdEventEncodingVersio suite.state.On("Final").Return(suite.snapshot, nil).Maybe() suite.snapshot.On("Identities", mock.Anything).Return(fixedENIDs, nil) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = (fixedENIDs.NodeIDs()).Strings() backend, err := New(params) suite.Require().NoError(err) @@ -2119,12 +1936,13 @@ func (suite *Suite) TestNodeCommunicator() { BlockId: blockId[:], } + // Left only one preferred execution node + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + suite.preferredExecutionNodeIDs = flow.IdentifierList{fixedENIDs[0].NodeID} + params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = (fixedENIDs.NodeIDs()).Strings() - // Left only one preferred execution node - params.PreferredExecutionNodeIDs = []string{fixedENIDs[0].NodeID.String()} backend, err := New(params) suite.Require().NoError(err) @@ -2217,6 +2035,13 @@ func (suite *Suite) defaultBackendParams() Params { TxResultQueryMode: IndexQueryModeExecutionNodesOnly, LastFullBlockHeight: suite.lastFullBlockHeight, VersionControl: suite.versionControl, + ExecNodeIdentitiesProvider: commonrpc.NewExecutionNodeIdentitiesProvider( + suite.log, + suite.state, + suite.receipts, + suite.preferredExecutionNodeIDs, + suite.fixedExecutionNodeIDs, + ), } } diff --git a/engine/access/rpc/backend/backend_transactions.go b/engine/access/rpc/backend/backend_transactions.go index e7596fd2f65..5ba1cecb038 100644 --- a/engine/access/rpc/backend/backend_transactions.go +++ b/engine/access/rpc/backend/backend_transactions.go @@ -32,7 +32,6 @@ type backendTransactions struct { *TransactionsLocalDataProvider staticCollectionRPC accessproto.AccessAPIClient // rpc client tied to a fixed collection node transactions storage.Transactions - executionReceipts storage.ExecutionReceipts // NOTE: The transaction error message is currently only used by the access node and not by the observer node. // To avoid introducing unnecessary command line arguments in the observer, one case could be that the error // message cache is nil for the observer node. @@ -49,8 +48,9 @@ type backendTransactions struct { txResultCache *lru.Cache[flow.Identifier, *access.TransactionResult] txResultQueryMode IndexQueryMode - systemTxID flow.Identifier - systemTx *flow.TransactionBody + systemTxID flow.Identifier + systemTx *flow.TransactionBody + execNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider } var _ TransactionErrorMessage = (*backendTransactions)(nil) @@ -411,14 +411,9 @@ func (b *backendTransactions) getTransactionResultsByBlockIDFromExecutionNode( BlockId: blockID[:], } - execNodes, err := commonrpc.ExecutionNodesForBlockID( + execNodes, err := b.execNodeIdentitiesProvider.ExecutionNodesForBlockID( ctx, blockID, - b.executionReceipts, - b.state, - b.log, - preferredENIdentifiers, - fixedENIdentifiers, ) if err != nil { if IsInsufficientExecutionReceipts(err) { @@ -573,14 +568,9 @@ func (b *backendTransactions) getTransactionResultByIndexFromExecutionNode( Index: index, } - execNodes, err := commonrpc.ExecutionNodesForBlockID( + execNodes, err := b.execNodeIdentitiesProvider.ExecutionNodesForBlockID( ctx, blockID, - b.executionReceipts, - b.state, - b.log, - preferredENIdentifiers, - fixedENIdentifiers, ) if err != nil { if IsInsufficientExecutionReceipts(err) { @@ -765,14 +755,9 @@ func (b *backendTransactions) getTransactionResultFromExecutionNode( TransactionId: transactionID[:], } - execNodes, err := commonrpc.ExecutionNodesForBlockID( + execNodes, err := b.execNodeIdentitiesProvider.ExecutionNodesForBlockID( ctx, blockID, - b.executionReceipts, - b.state, - b.log, - preferredENIdentifiers, - fixedENIdentifiers, ) if err != nil { // if no execution receipt were found, return a NotFound GRPC error @@ -1011,14 +996,9 @@ func (b *backendTransactions) LookupErrorMessageByTransactionID( } } - execNodes, err := commonrpc.ExecutionNodesForBlockID( + execNodes, err := b.execNodeIdentitiesProvider.ExecutionNodesForBlockID( ctx, blockID, - b.executionReceipts, - b.state, - b.log, - preferredENIdentifiers, - fixedENIdentifiers, ) if err != nil { if IsInsufficientExecutionReceipts(err) { @@ -1071,14 +1051,9 @@ func (b *backendTransactions) LookupErrorMessageByIndex( } } - execNodes, err := commonrpc.ExecutionNodesForBlockID( + execNodes, err := b.execNodeIdentitiesProvider.ExecutionNodesForBlockID( ctx, blockID, - b.executionReceipts, - b.state, - b.log, - preferredENIdentifiers, - fixedENIdentifiers, ) if err != nil { if IsInsufficientExecutionReceipts(err) { @@ -1136,13 +1111,9 @@ func (b *backendTransactions) LookupErrorMessagesByBlockID( } } - execNodes, err := commonrpc.ExecutionNodesForBlockID(ctx, + execNodes, err := b.execNodeIdentitiesProvider.ExecutionNodesForBlockID( + ctx, blockID, - b.executionReceipts, - b.state, - b.log, - preferredENIdentifiers, - fixedENIdentifiers, ) if err != nil { if IsInsufficientExecutionReceipts(err) { diff --git a/engine/access/rpc/backend/backend_transactions_test.go b/engine/access/rpc/backend/backend_transactions_test.go index dabdd33bbbf..eb87fe4415d 100644 --- a/engine/access/rpc/backend/backend_transactions_test.go +++ b/engine/access/rpc/backend/backend_transactions_test.go @@ -355,6 +355,8 @@ func (suite *Suite) TestLookupTransactionErrorMessageByTransactionID_HappyPath() suite.state.On("Final").Return(suite.snapshot, nil).Maybe() suite.snapshot.On("Identities", mock.Anything).Return(fixedENIDs, nil) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() params.TxResultErrorMessages = suite.txErrorMessages @@ -362,7 +364,6 @@ func (suite *Suite) TestLookupTransactionErrorMessageByTransactionID_HappyPath() suite.Run("happy path from EN", func() { // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = fixedENIDs.NodeIDs().Strings() // Mock the cache lookup for the transaction error message, returning "not found". suite.txErrorMessages.On("ByBlockIDTransactionID", blockId, failedTxId). @@ -433,11 +434,12 @@ func (suite *Suite) TestLookupTransactionErrorMessageByTransactionID_FailedToFet reporter.On("LowestIndexedHeight").Return(block.Header.Height, nil) reporter.On("HighestIndexedHeight").Return(block.Header.Height+10, nil) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() // The connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() // Initialize the transaction results index with the mock reporter. - params.FixedExecutionNodeIDs = fixedENIDs.NodeIDs().Strings() params.TxResultsIndex = index.NewTransactionResultsIndex(index.NewReporter(), suite.transactionResults) err := params.TxResultsIndex.Initialize(reporter) suite.Require().NoError(err) @@ -535,6 +537,8 @@ func (suite *Suite) TestLookupTransactionErrorMessageByIndex_HappyPath() { suite.state.On("Final").Return(suite.snapshot, nil).Maybe() suite.snapshot.On("Identities", mock.Anything).Return(fixedENIDs, nil) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() params.TxResultErrorMessages = suite.txErrorMessages @@ -542,7 +546,6 @@ func (suite *Suite) TestLookupTransactionErrorMessageByIndex_HappyPath() { suite.Run("happy path from EN", func() { // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = fixedENIDs.NodeIDs().Strings() // Mock the cache lookup for the transaction error message, returning "not found". suite.txErrorMessages.On("ByBlockIDTransactionIndex", blockId, failedTxIndex). @@ -618,10 +621,11 @@ func (suite *Suite) TestLookupTransactionErrorMessageByIndex_FailedToFetch() { reporter.On("LowestIndexedHeight").Return(block.Header.Height, nil) reporter.On("HighestIndexedHeight").Return(block.Header.Height+10, nil) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = connFactory - params.FixedExecutionNodeIDs = fixedENIDs.NodeIDs().Strings() // Initialize the transaction results index with the mock reporter. params.TxResultsIndex = index.NewTransactionResultsIndex(index.NewReporter(), suite.transactionResults) err := params.TxResultsIndex.Initialize(reporter) @@ -724,6 +728,8 @@ func (suite *Suite) TestLookupTransactionErrorMessagesByBlockID_HappyPath() { suite.state.On("Final").Return(suite.snapshot, nil).Maybe() suite.snapshot.On("Identities", mock.Anything).Return(fixedENIDs, nil) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() params.TxResultErrorMessages = suite.txErrorMessages @@ -731,7 +737,6 @@ func (suite *Suite) TestLookupTransactionErrorMessagesByBlockID_HappyPath() { suite.Run("happy path from EN", func() { // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = fixedENIDs.NodeIDs().Strings() // Mock the cache lookup for the transaction error messages, returning "not found". suite.txErrorMessages.On("ByBlockID", blockId). @@ -827,10 +832,11 @@ func (suite *Suite) TestLookupTransactionErrorMessagesByBlockID_FailedToFetch() reporter.On("LowestIndexedHeight").Return(block.Header.Height, nil) reporter.On("HighestIndexedHeight").Return(block.Header.Height+10, nil) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = fixedENIDs.NodeIDs().Strings() // Initialize the transaction results index with the mock reporter. params.TxResultsIndex = index.NewTransactionResultsIndex(index.NewReporter(), suite.transactionResults) err := params.TxResultsIndex.Initialize(reporter) @@ -1315,13 +1321,13 @@ func (suite *Suite) TestTransactionResultFromStorage() { err := indexReporter.Initialize(reporter) suite.Require().NoError(err) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + // Set up the backend parameters and the backend instance params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = fixedENIDs.NodeIDs().Strings() params.TxResultQueryMode = IndexQueryModeLocalOnly - params.EventsIndex = index.NewEventsIndex(indexReporter, suite.events) params.TxResultsIndex = index.NewTransactionResultsIndex(indexReporter, suite.transactionResults) @@ -1402,13 +1408,13 @@ func (suite *Suite) TestTransactionByIndexFromStorage() { err := indexReporter.Initialize(reporter) suite.Require().NoError(err) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + // Set up the backend parameters and the backend instance params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = fixedENIDs.NodeIDs().Strings() params.TxResultQueryMode = IndexQueryModeLocalOnly - params.EventsIndex = index.NewEventsIndex(indexReporter, suite.events) params.TxResultsIndex = index.NewTransactionResultsIndex(indexReporter, suite.transactionResults) @@ -1495,15 +1501,14 @@ func (suite *Suite) TestTransactionResultsByBlockIDFromStorage() { err := indexReporter.Initialize(reporter) suite.Require().NoError(err) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + // Set up the state and snapshot mocks and the backend instance params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = fixedENIDs.NodeIDs().Strings() - params.EventsIndex = index.NewEventsIndex(indexReporter, suite.events) params.TxResultsIndex = index.NewTransactionResultsIndex(indexReporter, suite.transactionResults) - params.TxResultQueryMode = IndexQueryModeLocalOnly backend, err := New(params) diff --git a/engine/access/rpc/backend/node_selector.go b/engine/access/rpc/backend/node_selector.go index 7d9ee12f64c..4aab3a89ca5 100644 --- a/engine/access/rpc/backend/node_selector.go +++ b/engine/access/rpc/backend/node_selector.go @@ -24,6 +24,19 @@ type NodeSelectorFactory struct { circuitBreakerEnabled bool } +// NewNodeSelectorFactory creates a new instance of NodeSelectorFactory with the provided circuit breaker configuration. +// +// When `circuitBreakerEnabled` is set to true, nodes will be selected using a pseudo-random sampling mechanism and picked in-order. +// When set to false, nodes will be selected in the order they are proposed, without any changes. +// +// Parameters: +// - circuitBreakerEnabled: A boolean that controls whether the circuit breaker is enabled for node selection. +func NewNodeSelectorFactory(circuitBreakerEnabled bool) *NodeSelectorFactory { + return &NodeSelectorFactory{ + circuitBreakerEnabled: circuitBreakerEnabled, + } +} + // SelectNodes selects the configured number of node identities from the provided list of nodes // and returns the node selector to iterate through them. func (n *NodeSelectorFactory) SelectNodes(nodes flow.IdentitySkeletonList) (NodeSelector, error) { diff --git a/engine/common/rpc/utils.go b/engine/common/rpc/execution_node_identities_provider.go similarity index 75% rename from engine/common/rpc/utils.go rename to engine/common/rpc/execution_node_identities_provider.go index 60eceff5bb3..75a1f0b33bf 100644 --- a/engine/common/rpc/utils.go +++ b/engine/common/rpc/execution_node_identities_provider.go @@ -34,17 +34,52 @@ func IdentifierList(ids []string) (flow.IdentifierList, error) { return idList, nil } +// ExecutionNodeIdentitiesProvider is a container for elements required to retrieve +// execution node identities for a given block ID. +type ExecutionNodeIdentitiesProvider struct { + log zerolog.Logger + + executionReceipts storage.ExecutionReceipts + state protocol.State + + preferredENIdentifiers flow.IdentifierList + fixedENIdentifiers flow.IdentifierList +} + +// NewExecutionNodeIdentitiesProvider creates and returns a new instance of +// ExecutionNodeIdentitiesProvider. +// +// Parameters: +// - log: The logger to use for logging. +// - state: The protocol state used for retrieving block information. +// - executionReceipts: A storage.ExecutionReceipts object that contains the execution receipts +// for blocks. +// - preferredENIdentifiers: A flow.IdentifierList of preferred execution node identifiers that +// are prioritized during selection. +// - fixedENIdentifiers: A flow.IdentifierList of fixed execution node identifiers that are +// always considered if available. +func NewExecutionNodeIdentitiesProvider( + log zerolog.Logger, + state protocol.State, + executionReceipts storage.ExecutionReceipts, + preferredENIdentifiers flow.IdentifierList, + fixedENIdentifiers flow.IdentifierList, +) *ExecutionNodeIdentitiesProvider { + return &ExecutionNodeIdentitiesProvider{ + log: log, + executionReceipts: executionReceipts, + state: state, + preferredENIdentifiers: preferredENIdentifiers, + fixedENIdentifiers: fixedENIdentifiers, + } +} + // ExecutionNodesForBlockID returns upto maxNodesCnt number of randomly chosen execution node identities // which have executed the given block ID. // If no such execution node is found, an InsufficientExecutionReceipts error is returned. -func ExecutionNodesForBlockID( +func (e *ExecutionNodeIdentitiesProvider) ExecutionNodesForBlockID( ctx context.Context, blockID flow.Identifier, - executionReceipts storage.ExecutionReceipts, - state protocol.State, - log zerolog.Logger, - preferredENIdentifiers flow.IdentifierList, - fixedENIdentifiers flow.IdentifierList, ) (flow.IdentitySkeletonList, error) { var ( executorIDs flow.IdentifierList @@ -53,10 +88,10 @@ func ExecutionNodesForBlockID( // check if the block ID is of the root block. If it is then don't look for execution receipts since they // will not be present for the root block. - rootBlock := state.Params().FinalizedRoot() + rootBlock := e.state.Params().FinalizedRoot() if rootBlock.ID() == blockID { - executorIdentities, err := state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleExecution)) + executorIdentities, err := e.state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleExecution)) if err != nil { return nil, fmt.Errorf("failed to retreive execution IDs for block ID %v: %w", blockID, err) } @@ -64,7 +99,7 @@ func ExecutionNodesForBlockID( } else { // try to find at least minExecutionNodesCnt execution node ids from the execution receipts for the given blockID for attempt := 0; attempt < maxAttemptsForExecutionReceipt; attempt++ { - executorIDs, err = findAllExecutionNodes(blockID, executionReceipts, log) + executorIDs, err = e.findAllExecutionNodes(blockID) if err != nil { return nil, err } @@ -74,7 +109,7 @@ func ExecutionNodesForBlockID( } // log the attempt - log.Debug().Int("attempt", attempt).Int("max_attempt", maxAttemptsForExecutionReceipt). + e.log.Debug().Int("attempt", attempt).Int("max_attempt", maxAttemptsForExecutionReceipt). Int("execution_receipts_found", len(executorIDs)). Str("block_id", blockID.String()). Msg("insufficient execution receipts") @@ -93,7 +128,7 @@ func ExecutionNodesForBlockID( receiptCnt := len(executorIDs) // if less than minExecutionNodesCnt execution receipts have been received so far, then return random ENs if receiptCnt < minExecutionNodesCnt { - newExecutorIDs, err := state.AtBlockID(blockID).Identities(filter.HasRole[flow.Identity](flow.RoleExecution)) + newExecutorIDs, err := e.state.AtBlockID(blockID).Identities(filter.HasRole[flow.Identity](flow.RoleExecution)) if err != nil { return nil, fmt.Errorf("failed to retreive execution IDs for block ID %v: %w", blockID, err) } @@ -102,7 +137,7 @@ func ExecutionNodesForBlockID( } // choose from the preferred or fixed execution nodes - subsetENs, err := chooseExecutionNodes(state, executorIDs, preferredENIdentifiers, fixedENIdentifiers) + subsetENs, err := e.chooseExecutionNodes(executorIDs) if err != nil { return nil, fmt.Errorf("failed to retreive execution IDs for block ID %v: %w", blockID, err) } @@ -116,13 +151,11 @@ func ExecutionNodesForBlockID( // findAllExecutionNodes find all the execution nodes ids from the execution receipts that have been received for the // given blockID -func findAllExecutionNodes( +func (e *ExecutionNodeIdentitiesProvider) findAllExecutionNodes( blockID flow.Identifier, - executionReceipts storage.ExecutionReceipts, - log zerolog.Logger, ) (flow.IdentifierList, error) { // lookup the receipt's storage with the block ID - allReceipts, err := executionReceipts.ByBlockID(blockID) + allReceipts, err := e.executionReceipts.ByBlockID(blockID) if err != nil { return nil, fmt.Errorf("failed to retreive execution receipts for block ID %v: %w", blockID, err) } @@ -150,7 +183,7 @@ func findAllExecutionNodes( // if there are more than one execution result for the same block ID, log as error if executionResultGroupedMetaList.NumberGroups() > 1 { identicalReceiptsStr := fmt.Sprintf("%v", flow.GetIDs(allReceipts)) - log.Error(). + e.log.Error(). Str("block_id", blockID.String()). Str("execution_receipts", identicalReceiptsStr). Msg("execution receipt mismatch") @@ -176,35 +209,32 @@ func findAllExecutionNodes( // If neither preferred nor fixed nodes are defined, then all execution node matching the executor IDs are returned. // e.g. If execution nodes in identity table are {1,2,3,4}, preferred ENs are defined as {2,3,4} // and the executor IDs is {1,2,3}, then {2, 3} is returned as the chosen subset of ENs -func chooseExecutionNodes( - state protocol.State, +func (e *ExecutionNodeIdentitiesProvider) chooseExecutionNodes( executorIDs flow.IdentifierList, - preferredENIdentifiers flow.IdentifierList, - fixedENIdentifiers flow.IdentifierList, ) (flow.IdentitySkeletonList, error) { - allENs, err := state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleExecution)) + allENs, err := e.state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleExecution)) if err != nil { return nil, fmt.Errorf("failed to retrieve all execution IDs: %w", err) } // choose from preferred EN IDs - if len(preferredENIdentifiers) > 0 { - chosenIDs := ChooseFromPreferredENIDs(allENs, executorIDs, preferredENIdentifiers) + if len(e.preferredENIdentifiers) > 0 { + chosenIDs := e.ChooseFromPreferredENIDs(allENs, executorIDs) return chosenIDs.ToSkeleton(), nil } // if no preferred EN ID is found, then choose from the fixed EN IDs - if len(fixedENIdentifiers) > 0 { + if len(e.fixedENIdentifiers) > 0 { // choose fixed ENs which have executed the transaction chosenIDs := allENs.Filter(filter.And( - filter.HasNodeID[flow.Identity](fixedENIdentifiers...), + filter.HasNodeID[flow.Identity](e.fixedENIdentifiers...), filter.HasNodeID[flow.Identity](executorIDs...), )) if len(chosenIDs) > 0 { return chosenIDs.ToSkeleton(), nil } // if no such ENs are found, then just choose all fixed ENs - chosenIDs = allENs.Filter(filter.HasNodeID[flow.Identity](fixedENIdentifiers...)) + chosenIDs = allENs.Filter(filter.HasNodeID[flow.Identity](e.fixedENIdentifiers...)) return chosenIDs.ToSkeleton(), nil } @@ -218,15 +248,15 @@ func chooseExecutionNodes( // 1. Use any EN with a receipt. // 2. Use any preferred node not already selected. // 3. Use any EN not already selected. -func ChooseFromPreferredENIDs(allENs flow.IdentityList, +func (e *ExecutionNodeIdentitiesProvider) ChooseFromPreferredENIDs( + allENs flow.IdentityList, executorIDs flow.IdentifierList, - preferredENIdentifiers flow.IdentifierList, ) flow.IdentityList { var chosenIDs flow.IdentityList // filter for both preferred and executor IDs chosenIDs = allENs.Filter(filter.And( - filter.HasNodeID[flow.Identity](preferredENIdentifiers...), + filter.HasNodeID[flow.Identity](e.preferredENIdentifiers...), filter.HasNodeID[flow.Identity](executorIDs...), )) @@ -255,7 +285,7 @@ func ChooseFromPreferredENIDs(allENs flow.IdentityList, } // add any preferred node not already selected - preferredENs := allENs.Filter(filter.HasNodeID[flow.Identity](preferredENIdentifiers...)) + preferredENs := allENs.Filter(filter.HasNodeID[flow.Identity](e.preferredENIdentifiers...)) addIfNotExists(preferredENs) if len(chosenIDs) >= MaxNodesCnt { return chosenIDs diff --git a/engine/common/rpc/execution_node_identities_provider_test.go b/engine/common/rpc/execution_node_identities_provider_test.go new file mode 100644 index 00000000000..2b033e3dac0 --- /dev/null +++ b/engine/common/rpc/execution_node_identities_provider_test.go @@ -0,0 +1,266 @@ +package rpc_test + +import ( + "context" + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/onflow/flow-go/engine/access/rpc/backend" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" + "github.com/onflow/flow-go/model/flow" + protocol "github.com/onflow/flow-go/state/protocol/mock" + storagemock "github.com/onflow/flow-go/storage/mock" + "github.com/onflow/flow-go/utils/unittest" +) + +// ENIdentitiesProviderSuite is a test suite for testing the ExecutionNodeIdentitiesProvider. +type ENIdentitiesProviderSuite struct { + suite.Suite + + state *protocol.State + snapshot *protocol.Snapshot + log zerolog.Logger + + receipts *storagemock.ExecutionReceipts +} + +func TestHandler(t *testing.T) { + suite.Run(t, new(ENIdentitiesProviderSuite)) +} + +// SetupTest initializes the test suite with mock state and receipts storage. +func (suite *ENIdentitiesProviderSuite) SetupTest() { + suite.log = zerolog.New(zerolog.NewConsoleWriter()) + suite.state = new(protocol.State) + suite.snapshot = new(protocol.Snapshot) + suite.receipts = new(storagemock.ExecutionReceipts) + + header := unittest.BlockHeaderFixture() + params := new(protocol.Params) + params.On("FinalizedRoot").Return(header, nil) + suite.state.On("Params").Return(params) +} + +// TestExecutionNodesForBlockID tests the ExecutionNodesForBlockID function. +// This function is responsible for retrieving execution nodes used to serve +// all API calls that interact with execution nodes. +func (suite *ENIdentitiesProviderSuite) TestExecutionNodesForBlockID() { + totalReceipts := 5 + + block := unittest.BlockFixture() + + // generate one execution node identities for each receipt assuming that each ER is generated by a unique exec node + allExecutionNodes := unittest.IdentityListFixture(totalReceipts, unittest.WithRole(flow.RoleExecution)) + + // one execution result for all receipts for this block + executionResult := unittest.ExecutionResultFixture() + + // generate execution receipts + receipts := make(flow.ExecutionReceiptList, totalReceipts) + for j := 0; j < totalReceipts; j++ { + r := unittest.ReceiptForBlockFixture(&block) + r.ExecutorID = allExecutionNodes[j].NodeID + er := *executionResult + r.ExecutionResult = er + receipts[j] = r + } + + currentAttempt := 0 + attempt1Receipts, attempt2Receipts, attempt3Receipts := receipts, receipts, receipts + + // setup receipts storage mock to return different list of receipts on each call + suite.receipts. + On("ByBlockID", block.ID()).Return( + func(id flow.Identifier) flow.ExecutionReceiptList { + switch currentAttempt { + case 0: + currentAttempt++ + return attempt1Receipts + case 1: + currentAttempt++ + return attempt2Receipts + default: + currentAttempt = 0 + return attempt3Receipts + } + }, + func(id flow.Identifier) error { return nil }) + + suite.snapshot.On("Identities", mock.Anything).Return( + func(filter flow.IdentityFilter[flow.Identity]) flow.IdentityList { + // apply the filter passed in to the list of all the execution nodes + return allExecutionNodes.Filter(filter) + }, + func(flow.IdentityFilter[flow.Identity]) error { return nil }) + suite.state.On("Final").Return(suite.snapshot, nil).Maybe() + + var preferredENIdentifiers flow.IdentifierList + var fixedENIdentifiers flow.IdentifierList + + testExecutionNodesForBlockID := func(preferredENs, fixedENs, expectedENs flow.IdentityList) { + + if preferredENs != nil { + preferredENIdentifiers = preferredENs.NodeIDs() + } + if fixedENs != nil { + fixedENIdentifiers = fixedENs.NodeIDs() + } + + if expectedENs == nil { + expectedENs = flow.IdentityList{} + } + + execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + suite.log, + suite.state, + suite.receipts, + preferredENIdentifiers, + fixedENIdentifiers, + ) + + allExecNodes, err := execNodeIdentitiesProvider.ExecutionNodesForBlockID(context.Background(), block.ID()) + require.NoError(suite.T(), err) + + execNodeSelectorFactory := backend.NewNodeSelectorFactory(false) + execSelector, err := execNodeSelectorFactory.SelectNodes(allExecNodes) + require.NoError(suite.T(), err) + + actualList := flow.IdentitySkeletonList{} + for actual := execSelector.Next(); actual != nil; actual = execSelector.Next() { + actualList = append(actualList, actual) + } + + { + expectedENs := expectedENs.ToSkeleton() + if len(expectedENs) > commonrpc.MaxNodesCnt { + for _, actual := range actualList { + require.Contains(suite.T(), expectedENs, actual) + } + } else { + require.ElementsMatch(suite.T(), actualList, expectedENs) + } + } + } + // if we don't find sufficient receipts, ExecutionNodesForBlockID should return a list of random ENs + suite.Run("insufficient receipts return random ENs in State", func() { + // return no receipts at all attempts + attempt1Receipts = flow.ExecutionReceiptList{} + attempt2Receipts = flow.ExecutionReceiptList{} + attempt3Receipts = flow.ExecutionReceiptList{} + suite.state.On("AtBlockID", mock.Anything).Return(suite.snapshot) + + execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + suite.log, + suite.state, + suite.receipts, + flow.IdentifierList{}, + flow.IdentifierList{}, + ) + + allExecNodes, err := execNodeIdentitiesProvider.ExecutionNodesForBlockID(context.Background(), block.ID()) + require.NoError(suite.T(), err) + + execNodeSelectorFactory := backend.NewNodeSelectorFactory(false) + execSelector, err := execNodeSelectorFactory.SelectNodes(allExecNodes) + require.NoError(suite.T(), err) + + actualList := flow.IdentitySkeletonList{} + for actual := execSelector.Next(); actual != nil; actual = execSelector.Next() { + actualList = append(actualList, actual) + } + + require.Equal(suite.T(), len(actualList), commonrpc.MaxNodesCnt) + }) + + // if no preferred or fixed ENs are specified, the ExecutionNodesForBlockID function should + // return the exe node list without a filter + suite.Run("no preferred or fixed ENs", func() { + testExecutionNodesForBlockID(nil, nil, allExecutionNodes) + }) + // if only fixed ENs are specified, the ExecutionNodesForBlockID function should + // return the fixed ENs list + suite.Run("two fixed ENs with zero preferred EN", func() { + // mark the first two ENs as fixed + fixedENs := allExecutionNodes[0:2] + expectedList := fixedENs + testExecutionNodesForBlockID(nil, fixedENs, expectedList) + }) + // if only preferred ENs are specified, the ExecutionNodesForBlockID function should + // return the preferred ENs list + suite.Run("two preferred ENs with zero fixed EN", func() { + // mark the first two ENs as preferred + preferredENs := allExecutionNodes[0:2] + expectedList := allExecutionNodes[0:commonrpc.MaxNodesCnt] + testExecutionNodesForBlockID(preferredENs, nil, expectedList) + }) + // if both are specified, the ExecutionNodesForBlockID function should + // return the preferred ENs list + suite.Run("four fixed ENs of which two are preferred ENs", func() { + // mark the first four ENs as fixed + fixedENs := allExecutionNodes[0:5] + // mark the first two of the fixed ENs as preferred ENs + preferredENs := fixedENs[0:2] + expectedList := fixedENs[0:commonrpc.MaxNodesCnt] + testExecutionNodesForBlockID(preferredENs, fixedENs, expectedList) + }) + // if both are specified, but the preferred ENs don't match the ExecutorIDs in the ER, + // the ExecutionNodesForBlockID function should return the fixed ENs list + suite.Run("four fixed ENs of which two are preferred ENs but have not generated the ER", func() { + // mark the first two ENs as fixed + fixedENs := allExecutionNodes[0:2] + // specify two ENs not specified in the ERs as preferred + preferredENs := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution)) + // add one more node ID besides of the fixed ENs list cause expected length of the list should be maxNodesCnt + expectedList := append(fixedENs, allExecutionNodes[2]) + testExecutionNodesForBlockID(preferredENs, fixedENs, expectedList) + }) + // if execution receipts are not yet available, the ExecutionNodesForBlockID function should retry twice + suite.Run("retry execution receipt query", func() { + // on first attempt, no execution receipts are available + attempt1Receipts = flow.ExecutionReceiptList{} + // on second attempt ony one is available + attempt2Receipts = flow.ExecutionReceiptList{receipts[0]} + // on third attempt all receipts are available + attempt3Receipts = receipts + currentAttempt = 0 + // mark the first two ENs as preferred + preferredENs := allExecutionNodes[0:2] + expectedList := allExecutionNodes[0:commonrpc.MaxNodesCnt] + testExecutionNodesForBlockID(preferredENs, nil, expectedList) + }) + // if preferredENIdentifiers was set and there are less than maxNodesCnt nodes selected than check the order + // of adding ENs ids + suite.Run("add nodes in the correct order", func() { + // mark the first EN as preferred + preferredENIdentifiers = allExecutionNodes[0:1].NodeIDs() + // mark the fourth EN with receipt + executorIDs := allExecutionNodes[3:4].NodeIDs() + + receiptNodes := allExecutionNodes[3:4] // any EN with a receipt + preferredNodes := allExecutionNodes[0:1] // preferred EN node not already selected + additionalNode := allExecutionNodes[1:2] // any EN not already selected + + expectedOrder := flow.IdentityList{ + receiptNodes[0], + preferredNodes[0], + additionalNode[0], + } + + execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + suite.log, + suite.state, + suite.receipts, + preferredENIdentifiers, + flow.IdentifierList{}, + ) + + chosenIDs := execNodeIdentitiesProvider.ChooseFromPreferredENIDs(allExecutionNodes, executorIDs) + + require.ElementsMatch(suite.T(), chosenIDs, expectedOrder) + require.Equal(suite.T(), len(chosenIDs), commonrpc.MaxNodesCnt) + }) +}