From 91773f39c8e1aa22ff5c91b954e496130e25a880 Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Wed, 25 Sep 2024 19:34:31 +0300 Subject: [PATCH 1/6] Created ExecutionNodeIdentitiesProvider, refactored ExecutionNodesForBlockID, backend and ingestion engine, updated tests --- engine/access/ingestion/engine.go | 16 +- engine/access/rpc/backend/backend.go | 85 +++-- engine/access/rpc/backend/backend_accounts.go | 23 +- .../rpc/backend/backend_accounts_test.go | 19 +- engine/access/rpc/backend/backend_events.go | 29 +- .../access/rpc/backend/backend_events_test.go | 27 +- engine/access/rpc/backend/backend_scripts.go | 22 +- .../rpc/backend/backend_scripts_test.go | 23 +- engine/access/rpc/backend/backend_test.go | 198 +---------- .../rpc/backend/backend_transactions.go | 49 +-- engine/access/rpc/backend/node_selector.go | 13 + ... => execution_node_identities_provider.go} | 94 +++-- ...execution_node_identities_provider_test.go | 325 ++++++++++++++++++ 13 files changed, 535 insertions(+), 388 deletions(-) rename engine/common/rpc/{utils.go => execution_node_identities_provider.go} (74%) create mode 100644 engine/common/rpc/execution_node_identities_provider_test.go diff --git a/engine/access/ingestion/engine.go b/engine/access/ingestion/engine.go index ed31d33c776..8cf2479666d 100644 --- a/engine/access/ingestion/engine.go +++ b/engine/access/ingestion/engine.go @@ -108,10 +108,8 @@ type Engine struct { // metrics collectionExecutedMetric module.CollectionExecutedMetric - preferredENIdentifiers flow.IdentifierList - fixedENIdentifiers flow.IdentifierList - - backend *backend.Backend + execProvider *commonrpc.ExecutionNodeIdentitiesProvider + backend *backend.Backend } var _ network.MessageProcessor = (*Engine)(nil) @@ -194,8 +192,7 @@ func New( executionReceiptsQueue: executionReceiptsQueue, messageHandler: messageHandler, backend: backend, - preferredENIdentifiers: preferredENIdentifiers, - fixedENIdentifiers: fixedENIdentifiers, + execProvider: commonrpc.NewExecutionNodeIdentitiesProvider(log, state, executionReceipts, preferredENIdentifiers, fixedENIdentifiers), } // jobqueue Jobs object that tracks finalized blocks by height. This is used by the finalizedBlockConsumer @@ -421,14 +418,9 @@ func (e *Engine) handleTransactionResultErrorMessages(ctx context.Context, block // retrieves error messages from the backend if they do not already exist in storage if !exists { - execNodes, err := commonrpc.ExecutionNodesForBlockID( + execNodes, err := e.execProvider.ExecutionNodesForBlockID( ctx, blockID, - e.executionReceipts, - e.state, - e.log, - e.preferredENIdentifiers, - e.preferredENIdentifiers, ) if err != nil { // in case querying nodes by existing execution receipts failed, diff --git a/engine/access/rpc/backend/backend.go b/engine/access/rpc/backend/backend.go index 2252d5d3252..c802561ec2e 100644 --- a/engine/access/rpc/backend/backend.go +++ b/engine/access/rpc/backend/backend.go @@ -41,11 +41,6 @@ const DefaultLoggedScriptsCacheSize = 1_000_000 // DefaultConnectionPoolSize is the default size for the connection pool to collection and execution nodes const DefaultConnectionPoolSize = 250 -var ( - preferredENIdentifiers flow.IdentifierList - fixedENIdentifiers flow.IdentifierList -) - // Backend implements the Access API. // // It is composed of several sub-backends that implement part of the Access API. @@ -146,6 +141,18 @@ func New(params Params) (*Backend, error) { } systemTxID := systemTx.ID() + preferredENIdentifiers, err := commonrpc.IdentifierList(params.PreferredExecutionNodeIDs) + if err != nil { + return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err) + } + + fixedENIdentifiers, err := commonrpc.IdentifierList(params.FixedExecutionNodeIDs) + if err != nil { + return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err) + } + + execProvider := commonrpc.NewExecutionNodeIdentitiesProvider(params.Log, params.State, params.ExecutionReceipts, preferredENIdentifiers, fixedENIdentifiers) + transactionsLocalDataProvider := &TransactionsLocalDataProvider{ state: params.State, collections: params.Collections, @@ -161,28 +168,28 @@ func New(params Params) (*Backend, error) { BlockTracker: params.BlockTracker, // create the sub-backends backendScripts: backendScripts{ - log: params.Log, - headers: params.Headers, - executionReceipts: params.ExecutionReceipts, - connFactory: params.ConnFactory, - state: params.State, - metrics: params.AccessMetrics, - loggedScripts: loggedScripts, - nodeCommunicator: params.Communicator, - scriptExecutor: params.ScriptExecutor, - scriptExecMode: params.ScriptExecutionMode, + log: params.Log, + headers: params.Headers, + connFactory: params.ConnFactory, + state: params.State, + metrics: params.AccessMetrics, + loggedScripts: loggedScripts, + nodeCommunicator: params.Communicator, + scriptExecutor: params.ScriptExecutor, + scriptExecMode: params.ScriptExecutionMode, + execProvider: execProvider, }, backendEvents: backendEvents{ - log: params.Log, - chain: params.ChainID.Chain(), - state: params.State, - headers: params.Headers, - executionReceipts: params.ExecutionReceipts, - connFactory: params.ConnFactory, - maxHeightRange: params.MaxHeightRange, - nodeCommunicator: params.Communicator, - queryMode: params.EventQueryMode, - eventsIndex: params.EventsIndex, + log: params.Log, + chain: params.ChainID.Chain(), + state: params.State, + headers: params.Headers, + connFactory: params.ConnFactory, + maxHeightRange: params.MaxHeightRange, + nodeCommunicator: params.Communicator, + queryMode: params.EventQueryMode, + eventsIndex: params.EventsIndex, + execProvider: execProvider, }, backendBlockHeaders: backendBlockHeaders{ headers: params.Headers, @@ -193,14 +200,14 @@ func New(params Params) (*Backend, error) { state: params.State, }, backendAccounts: backendAccounts{ - log: params.Log, - state: params.State, - headers: params.Headers, - executionReceipts: params.ExecutionReceipts, - connFactory: params.ConnFactory, - nodeCommunicator: params.Communicator, - scriptExecutor: params.ScriptExecutor, - scriptExecMode: params.ScriptExecutionMode, + log: params.Log, + state: params.State, + headers: params.Headers, + connFactory: params.ConnFactory, + nodeCommunicator: params.Communicator, + scriptExecutor: params.ScriptExecutor, + scriptExecMode: params.ScriptExecutionMode, + execProvider: execProvider, }, backendExecutionResults: backendExecutionResults{ executionResults: params.ExecutionResults, @@ -239,7 +246,6 @@ func New(params Params) (*Backend, error) { staticCollectionRPC: params.CollectionRPC, chainID: params.ChainID, transactions: params.Transactions, - executionReceipts: params.ExecutionReceipts, txResultErrorMessages: params.TxResultErrorMessages, transactionValidator: txValidator, transactionMetrics: params.AccessMetrics, @@ -251,6 +257,7 @@ func New(params Params) (*Backend, error) { txResultQueryMode: params.TxResultQueryMode, systemTx: systemTx, systemTxID: systemTxID, + execProvider: execProvider, } // TODO: The TransactionErrorMessage interface should be reorganized in future, as it is implemented in backendTransactions but used in TransactionsLocalDataProvider, and its initialization is somewhat quirky. @@ -267,16 +274,6 @@ func New(params Params) (*Backend, error) { retry.SetBackend(b) - preferredENIdentifiers, err = commonrpc.IdentifierList(params.PreferredExecutionNodeIDs) - if err != nil { - return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err) - } - - fixedENIdentifiers, err = commonrpc.IdentifierList(params.FixedExecutionNodeIDs) - if err != nil { - return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err) - } - return b, nil } diff --git a/engine/access/rpc/backend/backend_accounts.go b/engine/access/rpc/backend/backend_accounts.go index fe9f79f3483..a104d8e2d58 100644 --- a/engine/access/rpc/backend/backend_accounts.go +++ b/engine/access/rpc/backend/backend_accounts.go @@ -25,14 +25,14 @@ import ( ) type backendAccounts struct { - log zerolog.Logger - state protocol.State - headers storage.Headers - executionReceipts storage.ExecutionReceipts - connFactory connection.ConnectionFactory - nodeCommunicator Communicator - scriptExecutor execution.ScriptExecutor - scriptExecMode IndexQueryMode + log zerolog.Logger + state protocol.State + headers storage.Headers + connFactory connection.ConnectionFactory + nodeCommunicator Communicator + scriptExecutor execution.ScriptExecutor + scriptExecMode IndexQueryMode + execProvider *commonrpc.ExecutionNodeIdentitiesProvider } // GetAccount returns the account details at the latest sealed block. @@ -420,14 +420,9 @@ func (b *backendAccounts) getAccountFromAnyExeNode( BlockId: blockID[:], } - execNodes, err := commonrpc.ExecutionNodesForBlockID( + execNodes, err := b.execProvider.ExecutionNodesForBlockID( ctx, blockID, - b.executionReceipts, - b.state, - b.log, - preferredENIdentifiers, - fixedENIdentifiers, ) if err != nil { return nil, rpc.ConvertError(err, "failed to find execution node to query", codes.Internal) diff --git a/engine/access/rpc/backend/backend_accounts_test.go b/engine/access/rpc/backend/backend_accounts_test.go index 50ca272da03..709ca93e132 100644 --- a/engine/access/rpc/backend/backend_accounts_test.go +++ b/engine/access/rpc/backend/backend_accounts_test.go @@ -13,6 +13,7 @@ import ( access "github.com/onflow/flow-go/engine/access/mock" connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" execmock "github.com/onflow/flow-go/module/execution/mock" @@ -77,12 +78,18 @@ func (s *BackendAccountsSuite) SetupTest() { func (s *BackendAccountsSuite) defaultBackend() *backendAccounts { return &backendAccounts{ - log: s.log, - state: s.state, - headers: s.headers, - executionReceipts: s.receipts, - connFactory: s.connectionFactory, - nodeCommunicator: NewNodeCommunicator(false), + log: s.log, + state: s.state, + headers: s.headers, + connFactory: s.connectionFactory, + nodeCommunicator: NewNodeCommunicator(false), + execProvider: commonrpc.NewExecutionNodeIdentitiesProvider( + s.log, + s.state, + s.receipts, + flow.IdentifierList{}, + flow.IdentifierList{}, + ), } } diff --git a/engine/access/rpc/backend/backend_events.go b/engine/access/rpc/backend/backend_events.go index 23a47085a2c..2d671bb6689 100644 --- a/engine/access/rpc/backend/backend_events.go +++ b/engine/access/rpc/backend/backend_events.go @@ -28,16 +28,16 @@ import ( ) type backendEvents struct { - headers storage.Headers - executionReceipts storage.ExecutionReceipts - state protocol.State - chain flow.Chain - connFactory connection.ConnectionFactory - log zerolog.Logger - maxHeightRange uint - nodeCommunicator Communicator - queryMode IndexQueryMode - eventsIndex *index.EventsIndex + headers storage.Headers + state protocol.State + chain flow.Chain + connFactory connection.ConnectionFactory + log zerolog.Logger + maxHeightRange uint + nodeCommunicator Communicator + queryMode IndexQueryMode + eventsIndex *index.EventsIndex + execProvider *rpc.ExecutionNodeIdentitiesProvider } // blockMetadata is used to capture information about requested blocks to avoid repeated blockID @@ -303,13 +303,10 @@ func (b *backendEvents) getBlockEventsFromExecutionNode( // choose the last block ID to find the list of execution nodes lastBlockID := blockIDs[len(blockIDs)-1] - execNodes, err := rpc.ExecutionNodesForBlockID(ctx, + execNodes, err := b.execProvider.ExecutionNodesForBlockID( + ctx, lastBlockID, - b.executionReceipts, - b.state, - b.log, - preferredENIdentifiers, - fixedENIdentifiers) + ) if err != nil { return nil, rpc.ConvertError(err, "failed to retrieve events from execution node", codes.Internal) } diff --git a/engine/access/rpc/backend/backend_events_test.go b/engine/access/rpc/backend/backend_events_test.go index b1b1a8c438d..08c94693a11 100644 --- a/engine/access/rpc/backend/backend_events_test.go +++ b/engine/access/rpc/backend/backend_events_test.go @@ -21,6 +21,7 @@ import ( "github.com/onflow/flow-go/engine/access/index" access "github.com/onflow/flow-go/engine/access/mock" connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/irrecoverable" @@ -175,16 +176,22 @@ func (s *BackendEventsSuite) SetupTest() { func (s *BackendEventsSuite) defaultBackend() *backendEvents { return &backendEvents{ - log: s.log, - chain: s.chainID.Chain(), - state: s.state, - headers: s.headers, - executionReceipts: s.receipts, - connFactory: s.connectionFactory, - nodeCommunicator: NewNodeCommunicator(false), - maxHeightRange: DefaultMaxHeightRange, - queryMode: IndexQueryModeExecutionNodesOnly, - eventsIndex: s.eventsIndex, + log: s.log, + chain: s.chainID.Chain(), + state: s.state, + headers: s.headers, + connFactory: s.connectionFactory, + nodeCommunicator: NewNodeCommunicator(false), + maxHeightRange: DefaultMaxHeightRange, + queryMode: IndexQueryModeExecutionNodesOnly, + eventsIndex: s.eventsIndex, + execProvider: commonrpc.NewExecutionNodeIdentitiesProvider( + s.log, + s.state, + s.receipts, + flow.IdentifierList{}, + flow.IdentifierList{}, + ), } } diff --git a/engine/access/rpc/backend/backend_scripts.go b/engine/access/rpc/backend/backend_scripts.go index 11351cbf620..3ad1e48b79b 100644 --- a/engine/access/rpc/backend/backend_scripts.go +++ b/engine/access/rpc/backend/backend_scripts.go @@ -28,16 +28,16 @@ import ( const uniqueScriptLoggingTimeWindow = 10 * time.Minute type backendScripts struct { - log zerolog.Logger - headers storage.Headers - executionReceipts storage.ExecutionReceipts - state protocol.State - connFactory connection.ConnectionFactory - metrics module.BackendScriptsMetrics - loggedScripts *lru.Cache[[md5.Size]byte, time.Time] - nodeCommunicator Communicator - scriptExecutor execution.ScriptExecutor - scriptExecMode IndexQueryMode + log zerolog.Logger + headers storage.Headers + state protocol.State + connFactory connection.ConnectionFactory + metrics module.BackendScriptsMetrics + loggedScripts *lru.Cache[[md5.Size]byte, time.Time] + nodeCommunicator Communicator + scriptExecutor execution.ScriptExecutor + scriptExecMode IndexQueryMode + execProvider *commonrpc.ExecutionNodeIdentitiesProvider } // scriptExecutionRequest encapsulates the data needed to execute a script to make it easier @@ -225,7 +225,7 @@ func (b *backendScripts) executeScriptOnAvailableExecutionNodes( r *scriptExecutionRequest, ) ([]byte, time.Duration, error) { // find few execution nodes which have executed the block earlier and provided an execution receipt for it - executors, err := commonrpc.ExecutionNodesForBlockID(ctx, r.blockID, b.executionReceipts, b.state, b.log, preferredENIdentifiers, fixedENIdentifiers) + executors, err := b.execProvider.ExecutionNodesForBlockID(ctx, r.blockID) if err != nil { return nil, 0, status.Errorf(codes.Internal, "failed to find script executors at blockId %v: %v", r.blockID.String(), err) } diff --git a/engine/access/rpc/backend/backend_scripts_test.go b/engine/access/rpc/backend/backend_scripts_test.go index 398d9ac33ec..6365b4624c3 100644 --- a/engine/access/rpc/backend/backend_scripts_test.go +++ b/engine/access/rpc/backend/backend_scripts_test.go @@ -18,6 +18,7 @@ import ( access "github.com/onflow/flow-go/engine/access/mock" connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" fvmerrors "github.com/onflow/flow-go/fvm/errors" "github.com/onflow/flow-go/model/flow" execmock "github.com/onflow/flow-go/module/execution/mock" @@ -96,14 +97,20 @@ func (s *BackendScriptsSuite) defaultBackend() *backendScripts { s.Require().NoError(err) return &backendScripts{ - log: s.log, - metrics: metrics.NewNoopCollector(), - state: s.state, - headers: s.headers, - executionReceipts: s.receipts, - loggedScripts: loggedScripts, - connFactory: s.connectionFactory, - nodeCommunicator: NewNodeCommunicator(false), + log: s.log, + metrics: metrics.NewNoopCollector(), + state: s.state, + headers: s.headers, + loggedScripts: loggedScripts, + connFactory: s.connectionFactory, + nodeCommunicator: NewNodeCommunicator(false), + execProvider: commonrpc.NewExecutionNodeIdentitiesProvider( + s.log, + s.state, + s.receipts, + flow.IdentifierList{}, + flow.IdentifierList{}, + ), } } diff --git a/engine/access/rpc/backend/backend_test.go b/engine/access/rpc/backend/backend_test.go index 22cb943bb25..c78136632ba 100644 --- a/engine/access/rpc/backend/backend_test.go +++ b/engine/access/rpc/backend/backend_test.go @@ -30,7 +30,6 @@ import ( backendmock "github.com/onflow/flow-go/engine/access/rpc/backend/mock" "github.com/onflow/flow-go/engine/access/rpc/connection" connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" - commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/engine/common/version" "github.com/onflow/flow-go/fvm/blueprints" @@ -1333,12 +1332,11 @@ func (suite *Suite) TestTransactionPendingToFinalizedStatusTransition() { params := suite.defaultBackendParams() params.ConnFactory = connFactory params.MaxHeightRange = TEST_MAX_HEIGHT + params.PreferredExecutionNodeIDs = []string{receipts[0].ExecutorID.String()} backend, err := New(params) suite.Require().NoError(err) - preferredENIdentifiers = flow.IdentifierList{receipts[0].ExecutorID} - // should return pending status when we have not observed collection for the transaction suite.Run("pending", func() { currentState = flow.TransactionStatusPending @@ -1722,200 +1720,6 @@ func (suite *Suite) TestGetNetworkParameters() { suite.Require().Equal(expectedChainID, actual.ChainID) } -// TestExecutionNodesForBlockID tests the common method backend.ExecutionNodesForBlockID used for serving all API calls -// that need to talk to an execution node. -func (suite *Suite) TestExecutionNodesForBlockID() { - - totalReceipts := 5 - - block := unittest.BlockFixture() - - // generate one execution node identities for each receipt assuming that each ER is generated by a unique exec node - allExecutionNodes := unittest.IdentityListFixture(totalReceipts, unittest.WithRole(flow.RoleExecution)) - - // one execution result for all receipts for this block - executionResult := unittest.ExecutionResultFixture() - - // generate execution receipts - receipts := make(flow.ExecutionReceiptList, totalReceipts) - for j := 0; j < totalReceipts; j++ { - r := unittest.ReceiptForBlockFixture(&block) - r.ExecutorID = allExecutionNodes[j].NodeID - er := *executionResult - r.ExecutionResult = er - receipts[j] = r - } - - currentAttempt := 0 - attempt1Receipts, attempt2Receipts, attempt3Receipts := receipts, receipts, receipts - - // setup receipts storage mock to return different list of receipts on each call - suite.receipts. - On("ByBlockID", block.ID()).Return( - func(id flow.Identifier) flow.ExecutionReceiptList { - switch currentAttempt { - case 0: - currentAttempt++ - return attempt1Receipts - case 1: - currentAttempt++ - return attempt2Receipts - default: - currentAttempt = 0 - return attempt3Receipts - } - }, - func(id flow.Identifier) error { return nil }) - - suite.snapshot.On("Identities", mock.Anything).Return( - func(filter flow.IdentityFilter[flow.Identity]) flow.IdentityList { - // apply the filter passed in to the list of all the execution nodes - return allExecutionNodes.Filter(filter) - }, - func(flow.IdentityFilter[flow.Identity]) error { return nil }) - suite.state.On("Final").Return(suite.snapshot, nil).Maybe() - - testExecutionNodesForBlockID := func(preferredENs, fixedENs, expectedENs flow.IdentityList) { - - if preferredENs != nil { - preferredENIdentifiers = preferredENs.NodeIDs() - } - if fixedENs != nil { - fixedENIdentifiers = fixedENs.NodeIDs() - } - - if expectedENs == nil { - expectedENs = flow.IdentityList{} - } - - allExecNodes, err := commonrpc.ExecutionNodesForBlockID(context.Background(), block.ID(), suite.receipts, suite.state, suite.log, preferredENIdentifiers, fixedENIdentifiers) - require.NoError(suite.T(), err) - - execNodeSelectorFactory := NodeSelectorFactory{circuitBreakerEnabled: false} - execSelector, err := execNodeSelectorFactory.SelectNodes(allExecNodes) - require.NoError(suite.T(), err) - - actualList := flow.IdentitySkeletonList{} - for actual := execSelector.Next(); actual != nil; actual = execSelector.Next() { - actualList = append(actualList, actual) - } - - { - expectedENs := expectedENs.ToSkeleton() - if len(expectedENs) > commonrpc.MaxNodesCnt { - for _, actual := range actualList { - require.Contains(suite.T(), expectedENs, actual) - } - } else { - require.ElementsMatch(suite.T(), actualList, expectedENs) - } - } - } - // if we don't find sufficient receipts, ExecutionNodesForBlockID should return a list of random ENs - suite.Run("insufficient receipts return random ENs in State", func() { - // return no receipts at all attempts - attempt1Receipts = flow.ExecutionReceiptList{} - attempt2Receipts = flow.ExecutionReceiptList{} - attempt3Receipts = flow.ExecutionReceiptList{} - suite.state.On("AtBlockID", mock.Anything).Return(suite.snapshot) - - allExecNodes, err := commonrpc.ExecutionNodesForBlockID(context.Background(), block.ID(), suite.receipts, suite.state, suite.log, preferredENIdentifiers, - fixedENIdentifiers) - require.NoError(suite.T(), err) - - execNodeSelectorFactory := NodeSelectorFactory{circuitBreakerEnabled: false} - execSelector, err := execNodeSelectorFactory.SelectNodes(allExecNodes) - require.NoError(suite.T(), err) - - actualList := flow.IdentitySkeletonList{} - for actual := execSelector.Next(); actual != nil; actual = execSelector.Next() { - actualList = append(actualList, actual) - } - - require.Equal(suite.T(), len(actualList), commonrpc.MaxNodesCnt) - }) - - // if no preferred or fixed ENs are specified, the ExecutionNodesForBlockID function should - // return the exe node list without a filter - suite.Run("no preferred or fixed ENs", func() { - testExecutionNodesForBlockID(nil, nil, allExecutionNodes) - }) - // if only fixed ENs are specified, the ExecutionNodesForBlockID function should - // return the fixed ENs list - suite.Run("two fixed ENs with zero preferred EN", func() { - // mark the first two ENs as fixed - fixedENs := allExecutionNodes[0:2] - expectedList := fixedENs - testExecutionNodesForBlockID(nil, fixedENs, expectedList) - }) - // if only preferred ENs are specified, the ExecutionNodesForBlockID function should - // return the preferred ENs list - suite.Run("two preferred ENs with zero fixed EN", func() { - // mark the first two ENs as preferred - preferredENs := allExecutionNodes[0:2] - expectedList := allExecutionNodes[0:commonrpc.MaxNodesCnt] - testExecutionNodesForBlockID(preferredENs, nil, expectedList) - }) - // if both are specified, the ExecutionNodesForBlockID function should - // return the preferred ENs list - suite.Run("four fixed ENs of which two are preferred ENs", func() { - // mark the first four ENs as fixed - fixedENs := allExecutionNodes[0:5] - // mark the first two of the fixed ENs as preferred ENs - preferredENs := fixedENs[0:2] - expectedList := fixedENs[0:commonrpc.MaxNodesCnt] - testExecutionNodesForBlockID(preferredENs, fixedENs, expectedList) - }) - // if both are specified, but the preferred ENs don't match the ExecutorIDs in the ER, - // the ExecutionNodesForBlockID function should return the fixed ENs list - suite.Run("four fixed ENs of which two are preferred ENs but have not generated the ER", func() { - // mark the first two ENs as fixed - fixedENs := allExecutionNodes[0:2] - // specify two ENs not specified in the ERs as preferred - preferredENs := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution)) - // add one more node ID besides of the fixed ENs list cause expected length of the list should be maxNodesCnt - expectedList := append(fixedENs, allExecutionNodes[2]) - testExecutionNodesForBlockID(preferredENs, fixedENs, expectedList) - }) - // if execution receipts are not yet available, the ExecutionNodesForBlockID function should retry twice - suite.Run("retry execution receipt query", func() { - // on first attempt, no execution receipts are available - attempt1Receipts = flow.ExecutionReceiptList{} - // on second attempt ony one is available - attempt2Receipts = flow.ExecutionReceiptList{receipts[0]} - // on third attempt all receipts are available - attempt3Receipts = receipts - currentAttempt = 0 - // mark the first two ENs as preferred - preferredENs := allExecutionNodes[0:2] - expectedList := allExecutionNodes[0:commonrpc.MaxNodesCnt] - testExecutionNodesForBlockID(preferredENs, nil, expectedList) - }) - // if preferredENIdentifiers was set and there are less than maxNodesCnt nodes selected than check the order - // of adding ENs ids - suite.Run("add nodes in the correct order", func() { - // mark the first EN as preferred - preferredENIdentifiers = allExecutionNodes[0:1].NodeIDs() - // mark the fourth EN with receipt - executorIDs := allExecutionNodes[3:4].NodeIDs() - - receiptNodes := allExecutionNodes[3:4] // any EN with a receipt - preferredNodes := allExecutionNodes[0:1] // preferred EN node not already selected - additionalNode := allExecutionNodes[1:2] // any EN not already selected - - expectedOrder := flow.IdentityList{ - receiptNodes[0], - preferredNodes[0], - additionalNode[0], - } - - chosenIDs := commonrpc.ChooseFromPreferredENIDs(allExecutionNodes, executorIDs, preferredENIdentifiers) - - require.ElementsMatch(suite.T(), chosenIDs, expectedOrder) - require.Equal(suite.T(), len(chosenIDs), commonrpc.MaxNodesCnt) - }) -} - // TestGetTransactionResultEventEncodingVersion tests the GetTransactionResult function with different event encoding versions. func (suite *Suite) TestGetTransactionResultEventEncodingVersion() { suite.state.On("Sealed").Return(suite.snapshot, nil).Maybe() diff --git a/engine/access/rpc/backend/backend_transactions.go b/engine/access/rpc/backend/backend_transactions.go index a7f482f5e38..6f5e37a88e6 100644 --- a/engine/access/rpc/backend/backend_transactions.go +++ b/engine/access/rpc/backend/backend_transactions.go @@ -32,7 +32,6 @@ type backendTransactions struct { *TransactionsLocalDataProvider staticCollectionRPC accessproto.AccessAPIClient // rpc client tied to a fixed collection node transactions storage.Transactions - executionReceipts storage.ExecutionReceipts // NOTE: The transaction error message is currently only used by the access node and not by the observer node. // To avoid introducing unnecessary command line arguments in the observer, one case could be that the error // message cache is nil for the observer node. @@ -49,8 +48,9 @@ type backendTransactions struct { txResultCache *lru.Cache[flow.Identifier, *access.TransactionResult] txResultQueryMode IndexQueryMode - systemTxID flow.Identifier - systemTx *flow.TransactionBody + systemTxID flow.Identifier + systemTx *flow.TransactionBody + execProvider *commonrpc.ExecutionNodeIdentitiesProvider } var _ TransactionErrorMessage = (*backendTransactions)(nil) @@ -411,14 +411,9 @@ func (b *backendTransactions) getTransactionResultsByBlockIDFromExecutionNode( BlockId: blockID[:], } - execNodes, err := commonrpc.ExecutionNodesForBlockID( + execNodes, err := b.execProvider.ExecutionNodesForBlockID( ctx, blockID, - b.executionReceipts, - b.state, - b.log, - preferredENIdentifiers, - fixedENIdentifiers, ) if err != nil { if IsInsufficientExecutionReceipts(err) { @@ -573,14 +568,9 @@ func (b *backendTransactions) getTransactionResultByIndexFromExecutionNode( Index: index, } - execNodes, err := commonrpc.ExecutionNodesForBlockID( + execNodes, err := b.execProvider.ExecutionNodesForBlockID( ctx, blockID, - b.executionReceipts, - b.state, - b.log, - preferredENIdentifiers, - fixedENIdentifiers, ) if err != nil { if IsInsufficientExecutionReceipts(err) { @@ -765,14 +755,9 @@ func (b *backendTransactions) getTransactionResultFromExecutionNode( TransactionId: transactionID[:], } - execNodes, err := commonrpc.ExecutionNodesForBlockID( + execNodes, err := b.execProvider.ExecutionNodesForBlockID( ctx, blockID, - b.executionReceipts, - b.state, - b.log, - preferredENIdentifiers, - fixedENIdentifiers, ) if err != nil { // if no execution receipt were found, return a NotFound GRPC error @@ -1010,14 +995,9 @@ func (b *backendTransactions) LookupErrorMessageByTransactionID( } } - execNodes, err := commonrpc.ExecutionNodesForBlockID( + execNodes, err := b.execProvider.ExecutionNodesForBlockID( ctx, blockID, - b.executionReceipts, - b.state, - b.log, - preferredENIdentifiers, - fixedENIdentifiers, ) if err != nil { if IsInsufficientExecutionReceipts(err) { @@ -1089,14 +1069,9 @@ func (b *backendTransactions) LookupErrorMessageByIndex( } } - execNodes, err := commonrpc.ExecutionNodesForBlockID( + execNodes, err := b.execProvider.ExecutionNodesForBlockID( ctx, blockID, - b.executionReceipts, - b.state, - b.log, - preferredENIdentifiers, - fixedENIdentifiers, ) if err != nil { if IsInsufficientExecutionReceipts(err) { @@ -1166,13 +1141,9 @@ func (b *backendTransactions) LookupErrorMessagesByBlockID( } } - execNodes, err := commonrpc.ExecutionNodesForBlockID(ctx, + execNodes, err := b.execProvider.ExecutionNodesForBlockID( + ctx, blockID, - b.executionReceipts, - b.state, - b.log, - preferredENIdentifiers, - fixedENIdentifiers, ) if err != nil { if IsInsufficientExecutionReceipts(err) { diff --git a/engine/access/rpc/backend/node_selector.go b/engine/access/rpc/backend/node_selector.go index 7d9ee12f64c..4aab3a89ca5 100644 --- a/engine/access/rpc/backend/node_selector.go +++ b/engine/access/rpc/backend/node_selector.go @@ -24,6 +24,19 @@ type NodeSelectorFactory struct { circuitBreakerEnabled bool } +// NewNodeSelectorFactory creates a new instance of NodeSelectorFactory with the provided circuit breaker configuration. +// +// When `circuitBreakerEnabled` is set to true, nodes will be selected using a pseudo-random sampling mechanism and picked in-order. +// When set to false, nodes will be selected in the order they are proposed, without any changes. +// +// Parameters: +// - circuitBreakerEnabled: A boolean that controls whether the circuit breaker is enabled for node selection. +func NewNodeSelectorFactory(circuitBreakerEnabled bool) *NodeSelectorFactory { + return &NodeSelectorFactory{ + circuitBreakerEnabled: circuitBreakerEnabled, + } +} + // SelectNodes selects the configured number of node identities from the provided list of nodes // and returns the node selector to iterate through them. func (n *NodeSelectorFactory) SelectNodes(nodes flow.IdentitySkeletonList) (NodeSelector, error) { diff --git a/engine/common/rpc/utils.go b/engine/common/rpc/execution_node_identities_provider.go similarity index 74% rename from engine/common/rpc/utils.go rename to engine/common/rpc/execution_node_identities_provider.go index 60eceff5bb3..0a56c53a08a 100644 --- a/engine/common/rpc/utils.go +++ b/engine/common/rpc/execution_node_identities_provider.go @@ -34,17 +34,54 @@ func IdentifierList(ids []string) (flow.IdentifierList, error) { return idList, nil } +// ExecutionNodeIdentitiesProvider is a container for elements required to retrieve +// execution node identities for a given block ID. +type ExecutionNodeIdentitiesProvider struct { + log zerolog.Logger + + executionReceipts storage.ExecutionReceipts + state protocol.State + + preferredENIdentifiers flow.IdentifierList + fixedENIdentifiers flow.IdentifierList +} + +// NewExecutionNodeIdentitiesProvider creates and returns a new instance of +// ExecutionNodeIdentitiesProvider. +// +// Parameters: +// - log: The logger to use for logging. +// - state: The protocol state used for retrieving block information. +// - executionReceipts: A storage.ExecutionReceipts object that contains the execution receipts +// for blocks. +// - preferredENIdentifiers: A flow.IdentifierList of preferred execution node identifiers that +// are prioritized during selection. +// - fixedENIdentifiers: A flow.IdentifierList of fixed execution node identifiers that are +// always considered if available. +// +// No error returns are expected during normal operations. +func NewExecutionNodeIdentitiesProvider( + log zerolog.Logger, + state protocol.State, + executionReceipts storage.ExecutionReceipts, + preferredENIdentifiers flow.IdentifierList, + fixedENIdentifiers flow.IdentifierList, +) *ExecutionNodeIdentitiesProvider { + return &ExecutionNodeIdentitiesProvider{ + log: log, + executionReceipts: executionReceipts, + state: state, + preferredENIdentifiers: preferredENIdentifiers, + fixedENIdentifiers: fixedENIdentifiers, + } +} + // ExecutionNodesForBlockID returns upto maxNodesCnt number of randomly chosen execution node identities // which have executed the given block ID. // If no such execution node is found, an InsufficientExecutionReceipts error is returned. -func ExecutionNodesForBlockID( +func (e *ExecutionNodeIdentitiesProvider) ExecutionNodesForBlockID( ctx context.Context, blockID flow.Identifier, - executionReceipts storage.ExecutionReceipts, - state protocol.State, - log zerolog.Logger, - preferredENIdentifiers flow.IdentifierList, - fixedENIdentifiers flow.IdentifierList, ) (flow.IdentitySkeletonList, error) { var ( executorIDs flow.IdentifierList @@ -53,10 +90,10 @@ func ExecutionNodesForBlockID( // check if the block ID is of the root block. If it is then don't look for execution receipts since they // will not be present for the root block. - rootBlock := state.Params().FinalizedRoot() + rootBlock := e.state.Params().FinalizedRoot() if rootBlock.ID() == blockID { - executorIdentities, err := state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleExecution)) + executorIdentities, err := e.state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleExecution)) if err != nil { return nil, fmt.Errorf("failed to retreive execution IDs for block ID %v: %w", blockID, err) } @@ -64,7 +101,7 @@ func ExecutionNodesForBlockID( } else { // try to find at least minExecutionNodesCnt execution node ids from the execution receipts for the given blockID for attempt := 0; attempt < maxAttemptsForExecutionReceipt; attempt++ { - executorIDs, err = findAllExecutionNodes(blockID, executionReceipts, log) + executorIDs, err = e.findAllExecutionNodes(blockID) if err != nil { return nil, err } @@ -74,7 +111,7 @@ func ExecutionNodesForBlockID( } // log the attempt - log.Debug().Int("attempt", attempt).Int("max_attempt", maxAttemptsForExecutionReceipt). + e.log.Debug().Int("attempt", attempt).Int("max_attempt", maxAttemptsForExecutionReceipt). Int("execution_receipts_found", len(executorIDs)). Str("block_id", blockID.String()). Msg("insufficient execution receipts") @@ -93,7 +130,7 @@ func ExecutionNodesForBlockID( receiptCnt := len(executorIDs) // if less than minExecutionNodesCnt execution receipts have been received so far, then return random ENs if receiptCnt < minExecutionNodesCnt { - newExecutorIDs, err := state.AtBlockID(blockID).Identities(filter.HasRole[flow.Identity](flow.RoleExecution)) + newExecutorIDs, err := e.state.AtBlockID(blockID).Identities(filter.HasRole[flow.Identity](flow.RoleExecution)) if err != nil { return nil, fmt.Errorf("failed to retreive execution IDs for block ID %v: %w", blockID, err) } @@ -102,7 +139,7 @@ func ExecutionNodesForBlockID( } // choose from the preferred or fixed execution nodes - subsetENs, err := chooseExecutionNodes(state, executorIDs, preferredENIdentifiers, fixedENIdentifiers) + subsetENs, err := e.chooseExecutionNodes(executorIDs) if err != nil { return nil, fmt.Errorf("failed to retreive execution IDs for block ID %v: %w", blockID, err) } @@ -116,13 +153,11 @@ func ExecutionNodesForBlockID( // findAllExecutionNodes find all the execution nodes ids from the execution receipts that have been received for the // given blockID -func findAllExecutionNodes( +func (e *ExecutionNodeIdentitiesProvider) findAllExecutionNodes( blockID flow.Identifier, - executionReceipts storage.ExecutionReceipts, - log zerolog.Logger, ) (flow.IdentifierList, error) { // lookup the receipt's storage with the block ID - allReceipts, err := executionReceipts.ByBlockID(blockID) + allReceipts, err := e.executionReceipts.ByBlockID(blockID) if err != nil { return nil, fmt.Errorf("failed to retreive execution receipts for block ID %v: %w", blockID, err) } @@ -150,7 +185,7 @@ func findAllExecutionNodes( // if there are more than one execution result for the same block ID, log as error if executionResultGroupedMetaList.NumberGroups() > 1 { identicalReceiptsStr := fmt.Sprintf("%v", flow.GetIDs(allReceipts)) - log.Error(). + e.log.Error(). Str("block_id", blockID.String()). Str("execution_receipts", identicalReceiptsStr). Msg("execution receipt mismatch") @@ -176,35 +211,32 @@ func findAllExecutionNodes( // If neither preferred nor fixed nodes are defined, then all execution node matching the executor IDs are returned. // e.g. If execution nodes in identity table are {1,2,3,4}, preferred ENs are defined as {2,3,4} // and the executor IDs is {1,2,3}, then {2, 3} is returned as the chosen subset of ENs -func chooseExecutionNodes( - state protocol.State, +func (e *ExecutionNodeIdentitiesProvider) chooseExecutionNodes( executorIDs flow.IdentifierList, - preferredENIdentifiers flow.IdentifierList, - fixedENIdentifiers flow.IdentifierList, ) (flow.IdentitySkeletonList, error) { - allENs, err := state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleExecution)) + allENs, err := e.state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleExecution)) if err != nil { return nil, fmt.Errorf("failed to retrieve all execution IDs: %w", err) } // choose from preferred EN IDs - if len(preferredENIdentifiers) > 0 { - chosenIDs := ChooseFromPreferredENIDs(allENs, executorIDs, preferredENIdentifiers) + if len(e.preferredENIdentifiers) > 0 { + chosenIDs := e.ChooseFromPreferredENIDs(allENs, executorIDs) return chosenIDs.ToSkeleton(), nil } // if no preferred EN ID is found, then choose from the fixed EN IDs - if len(fixedENIdentifiers) > 0 { + if len(e.fixedENIdentifiers) > 0 { // choose fixed ENs which have executed the transaction chosenIDs := allENs.Filter(filter.And( - filter.HasNodeID[flow.Identity](fixedENIdentifiers...), + filter.HasNodeID[flow.Identity](e.fixedENIdentifiers...), filter.HasNodeID[flow.Identity](executorIDs...), )) if len(chosenIDs) > 0 { return chosenIDs.ToSkeleton(), nil } // if no such ENs are found, then just choose all fixed ENs - chosenIDs = allENs.Filter(filter.HasNodeID[flow.Identity](fixedENIdentifiers...)) + chosenIDs = allENs.Filter(filter.HasNodeID[flow.Identity](e.fixedENIdentifiers...)) return chosenIDs.ToSkeleton(), nil } @@ -218,15 +250,15 @@ func chooseExecutionNodes( // 1. Use any EN with a receipt. // 2. Use any preferred node not already selected. // 3. Use any EN not already selected. -func ChooseFromPreferredENIDs(allENs flow.IdentityList, +func (e *ExecutionNodeIdentitiesProvider) ChooseFromPreferredENIDs( + allENs flow.IdentityList, executorIDs flow.IdentifierList, - preferredENIdentifiers flow.IdentifierList, ) flow.IdentityList { var chosenIDs flow.IdentityList // filter for both preferred and executor IDs chosenIDs = allENs.Filter(filter.And( - filter.HasNodeID[flow.Identity](preferredENIdentifiers...), + filter.HasNodeID[flow.Identity](e.preferredENIdentifiers...), filter.HasNodeID[flow.Identity](executorIDs...), )) @@ -255,7 +287,7 @@ func ChooseFromPreferredENIDs(allENs flow.IdentityList, } // add any preferred node not already selected - preferredENs := allENs.Filter(filter.HasNodeID[flow.Identity](preferredENIdentifiers...)) + preferredENs := allENs.Filter(filter.HasNodeID[flow.Identity](e.preferredENIdentifiers...)) addIfNotExists(preferredENs) if len(chosenIDs) >= MaxNodesCnt { return chosenIDs diff --git a/engine/common/rpc/execution_node_identities_provider_test.go b/engine/common/rpc/execution_node_identities_provider_test.go new file mode 100644 index 00000000000..75171de4d20 --- /dev/null +++ b/engine/common/rpc/execution_node_identities_provider_test.go @@ -0,0 +1,325 @@ +package rpc_test + +import ( + "context" + "testing" + + "github.com/dgraph-io/badger/v2" + "github.com/rs/zerolog" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + accessmock "github.com/onflow/flow-go/engine/access/mock" + "github.com/onflow/flow-go/engine/access/rpc/backend" + backendmock "github.com/onflow/flow-go/engine/access/rpc/backend/mock" + connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" + "github.com/onflow/flow-go/engine/common/version" + "github.com/onflow/flow-go/fvm/blueprints" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/counters" + protocol "github.com/onflow/flow-go/state/protocol/mock" + bstorage "github.com/onflow/flow-go/storage/badger" + storagemock "github.com/onflow/flow-go/storage/mock" + "github.com/onflow/flow-go/utils/unittest" +) + +type Suite struct { + suite.Suite + + state *protocol.State + snapshot *protocol.Snapshot + log zerolog.Logger + + blocks *storagemock.Blocks + headers *storagemock.Headers + collections *storagemock.Collections + transactions *storagemock.Transactions + receipts *storagemock.ExecutionReceipts + results *storagemock.ExecutionResults + transactionResults *storagemock.LightTransactionResults + events *storagemock.Events + txErrorMessages *storagemock.TransactionResultErrorMessages + + db *badger.DB + dbDir string + lastFullBlockHeight *counters.PersistentStrictMonotonicCounter + versionControl *version.VersionControl + + colClient *accessmock.AccessAPIClient + execClient *accessmock.ExecutionAPIClient + historicalAccessClient *accessmock.AccessAPIClient + + connectionFactory *connectionmock.ConnectionFactory + communicator *backendmock.Communicator + + chainID flow.ChainID + systemTx *flow.TransactionBody +} + +func TestHandler(t *testing.T) { + suite.Run(t, new(Suite)) +} + +func (suite *Suite) SetupTest() { + suite.log = zerolog.New(zerolog.NewConsoleWriter()) + suite.state = new(protocol.State) + suite.snapshot = new(protocol.Snapshot) + header := unittest.BlockHeaderFixture() + params := new(protocol.Params) + params.On("FinalizedRoot").Return(header, nil) + params.On("SporkID").Return(unittest.IdentifierFixture(), nil) + params.On("ProtocolVersion").Return(uint(unittest.Uint64InRange(10, 30)), nil) + params.On("SporkRootBlockHeight").Return(header.Height, nil) + params.On("SealedRoot").Return(header, nil) + suite.state.On("Params").Return(params) + + suite.blocks = new(storagemock.Blocks) + suite.headers = new(storagemock.Headers) + suite.transactions = new(storagemock.Transactions) + suite.collections = new(storagemock.Collections) + suite.receipts = new(storagemock.ExecutionReceipts) + suite.results = new(storagemock.ExecutionResults) + suite.colClient = new(accessmock.AccessAPIClient) + suite.execClient = new(accessmock.ExecutionAPIClient) + suite.transactionResults = storagemock.NewLightTransactionResults(suite.T()) + suite.events = storagemock.NewEvents(suite.T()) + suite.chainID = flow.Testnet + suite.historicalAccessClient = new(accessmock.AccessAPIClient) + suite.connectionFactory = connectionmock.NewConnectionFactory(suite.T()) + + suite.communicator = new(backendmock.Communicator) + + var err error + suite.systemTx, err = blueprints.SystemChunkTransaction(flow.Testnet.Chain()) + suite.Require().NoError(err) + + suite.db, suite.dbDir = unittest.TempBadgerDB(suite.T()) + suite.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter( + bstorage.NewConsumerProgress(suite.db, module.ConsumeProgressLastFullBlockHeight), + 0, + ) + suite.Require().NoError(err) +} + +// TestExecutionNodesForBlockID tests the ExecutionNodesForBlockID used for serving all API calls +// that need to talk to an execution node. +func (suite *Suite) TestExecutionNodesForBlockID() { + + totalReceipts := 5 + + block := unittest.BlockFixture() + + // generate one execution node identities for each receipt assuming that each ER is generated by a unique exec node + allExecutionNodes := unittest.IdentityListFixture(totalReceipts, unittest.WithRole(flow.RoleExecution)) + + // one execution result for all receipts for this block + executionResult := unittest.ExecutionResultFixture() + + // generate execution receipts + receipts := make(flow.ExecutionReceiptList, totalReceipts) + for j := 0; j < totalReceipts; j++ { + r := unittest.ReceiptForBlockFixture(&block) + r.ExecutorID = allExecutionNodes[j].NodeID + er := *executionResult + r.ExecutionResult = er + receipts[j] = r + } + + currentAttempt := 0 + attempt1Receipts, attempt2Receipts, attempt3Receipts := receipts, receipts, receipts + + // setup receipts storage mock to return different list of receipts on each call + suite.receipts. + On("ByBlockID", block.ID()).Return( + func(id flow.Identifier) flow.ExecutionReceiptList { + switch currentAttempt { + case 0: + currentAttempt++ + return attempt1Receipts + case 1: + currentAttempt++ + return attempt2Receipts + default: + currentAttempt = 0 + return attempt3Receipts + } + }, + func(id flow.Identifier) error { return nil }) + + suite.snapshot.On("Identities", mock.Anything).Return( + func(filter flow.IdentityFilter[flow.Identity]) flow.IdentityList { + // apply the filter passed in to the list of all the execution nodes + return allExecutionNodes.Filter(filter) + }, + func(flow.IdentityFilter[flow.Identity]) error { return nil }) + suite.state.On("Final").Return(suite.snapshot, nil).Maybe() + + var preferredENIdentifiers flow.IdentifierList + var fixedENIdentifiers flow.IdentifierList + + testExecutionNodesForBlockID := func(preferredENs, fixedENs, expectedENs flow.IdentityList) { + + if preferredENs != nil { + preferredENIdentifiers = preferredENs.NodeIDs() + } + if fixedENs != nil { + fixedENIdentifiers = fixedENs.NodeIDs() + } + + if expectedENs == nil { + expectedENs = flow.IdentityList{} + } + + execProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + suite.log, + suite.state, + suite.receipts, + preferredENIdentifiers, + fixedENIdentifiers, + ) + + allExecNodes, err := execProvider.ExecutionNodesForBlockID(context.Background(), block.ID()) + require.NoError(suite.T(), err) + + execNodeSelectorFactory := backend.NewNodeSelectorFactory(false) + execSelector, err := execNodeSelectorFactory.SelectNodes(allExecNodes) + require.NoError(suite.T(), err) + + actualList := flow.IdentitySkeletonList{} + for actual := execSelector.Next(); actual != nil; actual = execSelector.Next() { + actualList = append(actualList, actual) + } + + { + expectedENs := expectedENs.ToSkeleton() + if len(expectedENs) > commonrpc.MaxNodesCnt { + for _, actual := range actualList { + require.Contains(suite.T(), expectedENs, actual) + } + } else { + require.ElementsMatch(suite.T(), actualList, expectedENs) + } + } + } + // if we don't find sufficient receipts, ExecutionNodesForBlockID should return a list of random ENs + suite.Run("insufficient receipts return random ENs in State", func() { + // return no receipts at all attempts + attempt1Receipts = flow.ExecutionReceiptList{} + attempt2Receipts = flow.ExecutionReceiptList{} + attempt3Receipts = flow.ExecutionReceiptList{} + suite.state.On("AtBlockID", mock.Anything).Return(suite.snapshot) + + execProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + suite.log, + suite.state, + suite.receipts, + flow.IdentifierList{}, + flow.IdentifierList{}, + ) + + allExecNodes, err := execProvider.ExecutionNodesForBlockID(context.Background(), block.ID()) + require.NoError(suite.T(), err) + + execNodeSelectorFactory := backend.NewNodeSelectorFactory(false) + execSelector, err := execNodeSelectorFactory.SelectNodes(allExecNodes) + require.NoError(suite.T(), err) + + actualList := flow.IdentitySkeletonList{} + for actual := execSelector.Next(); actual != nil; actual = execSelector.Next() { + actualList = append(actualList, actual) + } + + require.Equal(suite.T(), len(actualList), commonrpc.MaxNodesCnt) + }) + + // if no preferred or fixed ENs are specified, the ExecutionNodesForBlockID function should + // return the exe node list without a filter + suite.Run("no preferred or fixed ENs", func() { + testExecutionNodesForBlockID(nil, nil, allExecutionNodes) + }) + // if only fixed ENs are specified, the ExecutionNodesForBlockID function should + // return the fixed ENs list + suite.Run("two fixed ENs with zero preferred EN", func() { + // mark the first two ENs as fixed + fixedENs := allExecutionNodes[0:2] + expectedList := fixedENs + testExecutionNodesForBlockID(nil, fixedENs, expectedList) + }) + // if only preferred ENs are specified, the ExecutionNodesForBlockID function should + // return the preferred ENs list + suite.Run("two preferred ENs with zero fixed EN", func() { + // mark the first two ENs as preferred + preferredENs := allExecutionNodes[0:2] + expectedList := allExecutionNodes[0:commonrpc.MaxNodesCnt] + testExecutionNodesForBlockID(preferredENs, nil, expectedList) + }) + // if both are specified, the ExecutionNodesForBlockID function should + // return the preferred ENs list + suite.Run("four fixed ENs of which two are preferred ENs", func() { + // mark the first four ENs as fixed + fixedENs := allExecutionNodes[0:5] + // mark the first two of the fixed ENs as preferred ENs + preferredENs := fixedENs[0:2] + expectedList := fixedENs[0:commonrpc.MaxNodesCnt] + testExecutionNodesForBlockID(preferredENs, fixedENs, expectedList) + }) + // if both are specified, but the preferred ENs don't match the ExecutorIDs in the ER, + // the ExecutionNodesForBlockID function should return the fixed ENs list + suite.Run("four fixed ENs of which two are preferred ENs but have not generated the ER", func() { + // mark the first two ENs as fixed + fixedENs := allExecutionNodes[0:2] + // specify two ENs not specified in the ERs as preferred + preferredENs := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution)) + // add one more node ID besides of the fixed ENs list cause expected length of the list should be maxNodesCnt + expectedList := append(fixedENs, allExecutionNodes[2]) + testExecutionNodesForBlockID(preferredENs, fixedENs, expectedList) + }) + // if execution receipts are not yet available, the ExecutionNodesForBlockID function should retry twice + suite.Run("retry execution receipt query", func() { + // on first attempt, no execution receipts are available + attempt1Receipts = flow.ExecutionReceiptList{} + // on second attempt ony one is available + attempt2Receipts = flow.ExecutionReceiptList{receipts[0]} + // on third attempt all receipts are available + attempt3Receipts = receipts + currentAttempt = 0 + // mark the first two ENs as preferred + preferredENs := allExecutionNodes[0:2] + expectedList := allExecutionNodes[0:commonrpc.MaxNodesCnt] + testExecutionNodesForBlockID(preferredENs, nil, expectedList) + }) + // if preferredENIdentifiers was set and there are less than maxNodesCnt nodes selected than check the order + // of adding ENs ids + suite.Run("add nodes in the correct order", func() { + // mark the first EN as preferred + preferredENIdentifiers = allExecutionNodes[0:1].NodeIDs() + // mark the fourth EN with receipt + executorIDs := allExecutionNodes[3:4].NodeIDs() + + receiptNodes := allExecutionNodes[3:4] // any EN with a receipt + preferredNodes := allExecutionNodes[0:1] // preferred EN node not already selected + additionalNode := allExecutionNodes[1:2] // any EN not already selected + + expectedOrder := flow.IdentityList{ + receiptNodes[0], + preferredNodes[0], + additionalNode[0], + } + + execProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + suite.log, + suite.state, + suite.receipts, + preferredENIdentifiers, + flow.IdentifierList{}, + ) + + chosenIDs := execProvider.ChooseFromPreferredENIDs(allExecutionNodes, executorIDs) + + require.ElementsMatch(suite.T(), chosenIDs, expectedOrder) + require.Equal(suite.T(), len(chosenIDs), commonrpc.MaxNodesCnt) + }) +} From a3b1fedd0f61fa44ea1cc5ee3fe2a3e7c353833c Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Wed, 25 Sep 2024 22:32:19 +0300 Subject: [PATCH 2/6] Moved ExecutionNodeIdentitiesProvider to node builder, updated tests --- .../node_builder/access_node_builder.go | 85 ++++++----- cmd/observer/node_builder/observer_builder.go | 56 ++++--- engine/access/access_test.go | 137 ++++++++++-------- engine/access/ingestion/engine.go | 15 +- engine/access/ingestion/engine_test.go | 44 +++--- engine/access/rpc/backend/backend.go | 85 +++++------ engine/access/rpc/backend/backend_test.go | 49 +++++-- .../rpc/backend/backend_transactions_test.go | 31 ++-- 8 files changed, 287 insertions(+), 215 deletions(-) diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index b1ec3793ebe..e4c77374f8c 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -55,6 +55,7 @@ import ( "github.com/onflow/flow-go/engine/access/subscription" followereng "github.com/onflow/flow-go/engine/common/follower" "github.com/onflow/flow-go/engine/common/requester" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/engine/common/stop" synceng "github.com/onflow/flow-go/engine/common/synchronization" "github.com/onflow/flow-go/engine/common/version" @@ -349,6 +350,8 @@ type FlowAccessNodeBuilder struct { stateStreamBackend *statestreambackend.StateStreamBackend nodeBackend *backend.Backend + + ExecNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider } func (builder *FlowAccessNodeBuilder) buildFollowerState() *FlowAccessNodeBuilder { @@ -1926,33 +1929,49 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { return nil, fmt.Errorf("transaction result query mode 'compare' is not supported") } + preferredENIdentifiers, err := commonrpc.IdentifierList(backendConfig.PreferredExecutionNodeIDs) + if err != nil { + return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err) + } + + fixedENIdentifiers, err := commonrpc.IdentifierList(backendConfig.FixedExecutionNodeIDs) + if err != nil { + return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err) + } + + builder.ExecNodeIdentitiesProvider = commonrpc.NewExecutionNodeIdentitiesProvider( + node.Logger, + node.State, + node.Storage.Receipts, + preferredENIdentifiers, + fixedENIdentifiers, + ) + builder.nodeBackend, err = backend.New(backend.Params{ - State: node.State, - CollectionRPC: builder.CollectionRPC, - HistoricalAccessNodes: builder.HistoricalAccessRPCs, - Blocks: node.Storage.Blocks, - Headers: node.Storage.Headers, - Collections: node.Storage.Collections, - Transactions: node.Storage.Transactions, - ExecutionReceipts: node.Storage.Receipts, - ExecutionResults: node.Storage.Results, - TxResultErrorMessages: node.Storage.TransactionResultErrorMessages, - ChainID: node.RootChainID, - AccessMetrics: builder.AccessMetrics, - ConnFactory: connFactory, - RetryEnabled: builder.retryEnabled, - MaxHeightRange: backendConfig.MaxHeightRange, - PreferredExecutionNodeIDs: backendConfig.PreferredExecutionNodeIDs, - FixedExecutionNodeIDs: backendConfig.FixedExecutionNodeIDs, - Log: node.Logger, - SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, - Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled), - TxResultCacheSize: builder.TxResultCacheSize, - ScriptExecutor: builder.ScriptExecutor, - ScriptExecutionMode: scriptExecMode, - CheckPayerBalance: builder.checkPayerBalance, - EventQueryMode: eventQueryMode, - BlockTracker: blockTracker, + State: node.State, + CollectionRPC: builder.CollectionRPC, + HistoricalAccessNodes: builder.HistoricalAccessRPCs, + Blocks: node.Storage.Blocks, + Headers: node.Storage.Headers, + Collections: node.Storage.Collections, + Transactions: node.Storage.Transactions, + ExecutionReceipts: node.Storage.Receipts, + ExecutionResults: node.Storage.Results, + TxResultErrorMessages: node.Storage.TransactionResultErrorMessages, + ChainID: node.RootChainID, + AccessMetrics: builder.AccessMetrics, + ConnFactory: connFactory, + RetryEnabled: builder.retryEnabled, + MaxHeightRange: backendConfig.MaxHeightRange, + Log: node.Logger, + SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, + Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled), + TxResultCacheSize: builder.TxResultCacheSize, + ScriptExecutor: builder.ScriptExecutor, + ScriptExecutionMode: scriptExecMode, + CheckPayerBalance: builder.checkPayerBalance, + EventQueryMode: eventQueryMode, + BlockTracker: blockTracker, SubscriptionHandler: subscription.NewSubscriptionHandler( builder.Logger, broadcaster, @@ -1960,11 +1979,12 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { builder.stateStreamConf.ResponseLimit, builder.stateStreamConf.ClientSendBufferSize, ), - EventsIndex: builder.EventsIndex, - TxResultQueryMode: txResultQueryMode, - TxResultsIndex: builder.TxResultsIndex, - LastFullBlockHeight: lastFullBlockHeight, - VersionControl: builder.VersionControl, + EventsIndex: builder.EventsIndex, + TxResultQueryMode: txResultQueryMode, + TxResultsIndex: builder.TxResultsIndex, + LastFullBlockHeight: lastFullBlockHeight, + VersionControl: builder.VersionControl, + ExecNodeIdentitiesProvider: builder.ExecNodeIdentitiesProvider, }) if err != nil { return nil, fmt.Errorf("could not initialize backend: %w", err) @@ -2041,8 +2061,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { processedBlockHeight, lastFullBlockHeight, builder.nodeBackend, - builder.rpcConf.BackendConfig.PreferredExecutionNodeIDs, - builder.rpcConf.BackendConfig.FixedExecutionNodeIDs, + builder.ExecNodeIdentitiesProvider, ) if err != nil { return nil, err diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index a1d1dbcf6a1..c0f65987ba7 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -54,6 +54,7 @@ import ( statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend" "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/engine/common/follower" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/engine/common/stop" synceng "github.com/onflow/flow-go/engine/common/synchronization" "github.com/onflow/flow-go/engine/common/version" @@ -1937,25 +1938,41 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { return nil, fmt.Errorf("failed to initialize block tracker: %w", err) } + preferredENIdentifiers, err := commonrpc.IdentifierList(backendConfig.PreferredExecutionNodeIDs) + if err != nil { + return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err) + } + + fixedENIdentifiers, err := commonrpc.IdentifierList(backendConfig.FixedExecutionNodeIDs) + if err != nil { + return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err) + } + + execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + node.Logger, + node.State, + node.Storage.Receipts, + preferredENIdentifiers, + fixedENIdentifiers, + ) + backendParams := backend.Params{ - State: node.State, - Blocks: node.Storage.Blocks, - Headers: node.Storage.Headers, - Collections: node.Storage.Collections, - Transactions: node.Storage.Transactions, - ExecutionReceipts: node.Storage.Receipts, - ExecutionResults: node.Storage.Results, - ChainID: node.RootChainID, - AccessMetrics: accessMetrics, - ConnFactory: connFactory, - RetryEnabled: false, - MaxHeightRange: backendConfig.MaxHeightRange, - PreferredExecutionNodeIDs: backendConfig.PreferredExecutionNodeIDs, - FixedExecutionNodeIDs: backendConfig.FixedExecutionNodeIDs, - Log: node.Logger, - SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, - Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled), - BlockTracker: blockTracker, + State: node.State, + Blocks: node.Storage.Blocks, + Headers: node.Storage.Headers, + Collections: node.Storage.Collections, + Transactions: node.Storage.Transactions, + ExecutionReceipts: node.Storage.Receipts, + ExecutionResults: node.Storage.Results, + ChainID: node.RootChainID, + AccessMetrics: accessMetrics, + ConnFactory: connFactory, + RetryEnabled: false, + MaxHeightRange: backendConfig.MaxHeightRange, + Log: node.Logger, + SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, + Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled), + BlockTracker: blockTracker, SubscriptionHandler: subscription.NewSubscriptionHandler( builder.Logger, broadcaster, @@ -1963,7 +1980,8 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { builder.stateStreamConf.ResponseLimit, builder.stateStreamConf.ClientSendBufferSize, ), - VersionControl: builder.VersionControl, + VersionControl: builder.VersionControl, + ExecNodeIdentitiesProvider: execNodeIdentitiesProvider, } if builder.localServiceAPIEnabled { diff --git a/engine/access/access_test.go b/engine/access/access_test.go index 7aa7aa3d07e..080bdd9d006 100644 --- a/engine/access/access_test.go +++ b/engine/access/access_test.go @@ -26,6 +26,7 @@ import ( "github.com/onflow/flow-go/engine/access/rpc/backend" connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" "github.com/onflow/flow-go/engine/access/subscription" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/factory" @@ -642,23 +643,32 @@ func (suite *Suite) TestGetSealedTransaction() { blocksToMarkExecuted, err := stdmap.NewTimes(100) require.NoError(suite.T(), err) - bnd, err := backend.New(backend.Params{State: suite.state, - CollectionRPC: suite.collClient, - Blocks: all.Blocks, - Headers: all.Headers, - Collections: collections, - Transactions: transactions, - ExecutionReceipts: receipts, - ExecutionResults: results, - ChainID: suite.chainID, - AccessMetrics: suite.metrics, - ConnFactory: connFactory, - MaxHeightRange: backend.DefaultMaxHeightRange, - PreferredExecutionNodeIDs: enNodeIDs.Strings(), - Log: suite.log, - SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, - Communicator: backend.NewNodeCommunicator(false), - TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly, + execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + suite.log, + suite.state, + receipts, + enNodeIDs, + nil, + ) + + bnd, err := backend.New(backend.Params{ + State: suite.state, + CollectionRPC: suite.collClient, + Blocks: all.Blocks, + Headers: all.Headers, + Collections: collections, + Transactions: transactions, + ExecutionReceipts: receipts, + ExecutionResults: results, + ChainID: suite.chainID, + AccessMetrics: suite.metrics, + ConnFactory: connFactory, + MaxHeightRange: backend.DefaultMaxHeightRange, + Log: suite.log, + SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, + Communicator: backend.NewNodeCommunicator(false), + TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly, + ExecNodeIdentitiesProvider: execNodeIdentitiesProvider, }) require.NoError(suite.T(), err) @@ -701,8 +711,7 @@ func (suite *Suite) TestGetSealedTransaction() { processedHeight, lastFullBlockHeight, bnd, - enNodeIDs.Strings(), - nil, + execNodeIdentitiesProvider, ) require.NoError(suite.T(), err) @@ -821,23 +830,31 @@ func (suite *Suite) TestGetTransactionResult() { blocksToMarkExecuted, err := stdmap.NewTimes(100) require.NoError(suite.T(), err) + execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + suite.log, + suite.state, + receipts, + enNodeIDs, + nil, + ) + bnd, err := backend.New(backend.Params{State: suite.state, - CollectionRPC: suite.collClient, - Blocks: all.Blocks, - Headers: all.Headers, - Collections: collections, - Transactions: transactions, - ExecutionReceipts: receipts, - ExecutionResults: results, - ChainID: suite.chainID, - AccessMetrics: suite.metrics, - ConnFactory: connFactory, - MaxHeightRange: backend.DefaultMaxHeightRange, - PreferredExecutionNodeIDs: enNodeIDs.Strings(), - Log: suite.log, - SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, - Communicator: backend.NewNodeCommunicator(false), - TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly, + CollectionRPC: suite.collClient, + Blocks: all.Blocks, + Headers: all.Headers, + Collections: collections, + Transactions: transactions, + ExecutionReceipts: receipts, + ExecutionResults: results, + ChainID: suite.chainID, + AccessMetrics: suite.metrics, + ConnFactory: connFactory, + MaxHeightRange: backend.DefaultMaxHeightRange, + Log: suite.log, + SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, + Communicator: backend.NewNodeCommunicator(false), + TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly, + ExecNodeIdentitiesProvider: execNodeIdentitiesProvider, }) require.NoError(suite.T(), err) @@ -879,8 +896,7 @@ func (suite *Suite) TestGetTransactionResult() { processedHeight, lastFullBlockHeight, bnd, - enNodeIDs.Strings(), - nil, + execNodeIdentitiesProvider, ) require.NoError(suite.T(), err) @@ -1054,26 +1070,34 @@ func (suite *Suite) TestExecuteScript() { enIdentities := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution)) enNodeIDs := enIdentities.NodeIDs() + execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + suite.log, + suite.state, + receipts, + nil, + enNodeIDs, + ) + var err error suite.backend, err = backend.New(backend.Params{ - State: suite.state, - CollectionRPC: suite.collClient, - Blocks: all.Blocks, - Headers: all.Headers, - Collections: collections, - Transactions: transactions, - ExecutionReceipts: receipts, - ExecutionResults: results, - ChainID: suite.chainID, - AccessMetrics: suite.metrics, - ConnFactory: connFactory, - MaxHeightRange: backend.DefaultMaxHeightRange, - FixedExecutionNodeIDs: (identities.NodeIDs()).Strings(), - Log: suite.log, - SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, - Communicator: backend.NewNodeCommunicator(false), - ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly, - TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly, + State: suite.state, + CollectionRPC: suite.collClient, + Blocks: all.Blocks, + Headers: all.Headers, + Collections: collections, + Transactions: transactions, + ExecutionReceipts: receipts, + ExecutionResults: results, + ChainID: suite.chainID, + AccessMetrics: suite.metrics, + ConnFactory: connFactory, + MaxHeightRange: backend.DefaultMaxHeightRange, + Log: suite.log, + SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, + Communicator: backend.NewNodeCommunicator(false), + ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly, + TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly, + ExecNodeIdentitiesProvider: execNodeIdentitiesProvider, }) require.NoError(suite.T(), err) @@ -1128,8 +1152,7 @@ func (suite *Suite) TestExecuteScript() { processedHeight, lastFullBlockHeight, suite.backend, - enNodeIDs.Strings(), - nil, + execNodeIdentitiesProvider, ) require.NoError(suite.T(), err) diff --git a/engine/access/ingestion/engine.go b/engine/access/ingestion/engine.go index 8cf2479666d..4e9a598cc30 100644 --- a/engine/access/ingestion/engine.go +++ b/engine/access/ingestion/engine.go @@ -134,8 +134,7 @@ func New( processedHeight storage.ConsumerProgress, lastFullBlockHeight *counters.PersistentStrictMonotonicCounter, backend *backend.Backend, - preferredExecutionNodeIDs []string, - fixedExecutionNodeIDs []string, + execProvider *commonrpc.ExecutionNodeIdentitiesProvider, ) (*Engine, error) { executionReceiptsRawQueue, err := fifoqueue.NewFifoQueue(defaultQueueCapacity) if err != nil { @@ -158,16 +157,6 @@ func New( collectionExecutedMetric.UpdateLastFullBlockHeight(lastFullBlockHeight.Value()) - preferredENIdentifiers, err := commonrpc.IdentifierList(preferredExecutionNodeIDs) - if err != nil { - return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err) - } - - fixedENIdentifiers, err := commonrpc.IdentifierList(fixedExecutionNodeIDs) - if err != nil { - return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err) - } - // initialize the propagation engine with its dependencies e := &Engine{ log: log.With().Str("engine", "ingestion").Logger(), @@ -192,7 +181,7 @@ func New( executionReceiptsQueue: executionReceiptsQueue, messageHandler: messageHandler, backend: backend, - execProvider: commonrpc.NewExecutionNodeIdentitiesProvider(log, state, executionReceipts, preferredENIdentifiers, fixedENIdentifiers), + execProvider: execProvider, } // jobqueue Jobs object that tracks finalized blocks by height. This is used by the finalizedBlockConsumer diff --git a/engine/access/ingestion/engine_test.go b/engine/access/ingestion/engine_test.go index 148dbb07af5..5a07d97ac7a 100644 --- a/engine/access/ingestion/engine_test.go +++ b/engine/access/ingestion/engine_test.go @@ -20,6 +20,7 @@ import ( accessmock "github.com/onflow/flow-go/engine/access/mock" "github.com/onflow/flow-go/engine/access/rpc/backend" connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/filter" "github.com/onflow/flow-go/module" @@ -86,6 +87,8 @@ type Suite struct { db *badger.DB dbDir string lastFullBlockHeight *counters.PersistentStrictMonotonicCounter + + execNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider } func TestIngestEngine(t *testing.T) { @@ -188,6 +191,14 @@ func (s *Suite) SetupTest() { s.blocks, ) require.NoError(s.T(), err) + + s.execNodeIdentitiesProvider = commonrpc.NewExecutionNodeIdentitiesProvider( + s.log, + s.proto.state, + s.receipts, + nil, + s.enNodeIDs, + ) } // initIngestionEngine create new instance of ingestion engine and waits when it starts @@ -218,8 +229,7 @@ func (s *Suite) initIngestionEngine(ctx irrecoverable.SignalerContext) *Engine { processedHeight, s.lastFullBlockHeight, s.backend, - s.enNodeIDs.Strings(), - nil, + s.execNodeIdentitiesProvider, ) require.NoError(s.T(), err) @@ -858,21 +868,21 @@ func (s *Suite) TestTransactionResultErrorMessagesAreFetched() { // Initialize the backend with the mocked state, blocks, headers, transactions, etc. var err error s.backend, err = backend.New(backend.Params{ - State: s.proto.state, - Blocks: s.blocks, - Headers: s.headers, - Transactions: s.transactions, - ExecutionReceipts: s.receipts, - ExecutionResults: s.results, - ConnFactory: connFactory, - MaxHeightRange: backend.DefaultMaxHeightRange, - FixedExecutionNodeIDs: s.enNodeIDs.Strings(), - Log: s.log, - SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, - Communicator: backend.NewNodeCommunicator(false), - ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly, - TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly, - ChainID: flow.Testnet, + State: s.proto.state, + Blocks: s.blocks, + Headers: s.headers, + Transactions: s.transactions, + ExecutionReceipts: s.receipts, + ExecutionResults: s.results, + ConnFactory: connFactory, + MaxHeightRange: backend.DefaultMaxHeightRange, + Log: s.log, + SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit, + Communicator: backend.NewNodeCommunicator(false), + ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly, + TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly, + ChainID: flow.Testnet, + ExecNodeIdentitiesProvider: s.execNodeIdentitiesProvider, }) require.NoError(s.T(), err) diff --git a/engine/access/rpc/backend/backend.go b/engine/access/rpc/backend/backend.go index c802561ec2e..83e1f8731e9 100644 --- a/engine/access/rpc/backend/backend.go +++ b/engine/access/rpc/backend/backend.go @@ -77,39 +77,38 @@ type Backend struct { } type Params struct { - State protocol.State - CollectionRPC accessproto.AccessAPIClient - HistoricalAccessNodes []accessproto.AccessAPIClient - Blocks storage.Blocks - Headers storage.Headers - Collections storage.Collections - Transactions storage.Transactions - ExecutionReceipts storage.ExecutionReceipts - ExecutionResults storage.ExecutionResults - TxResultErrorMessages storage.TransactionResultErrorMessages - ChainID flow.ChainID - AccessMetrics module.AccessMetrics - ConnFactory connection.ConnectionFactory - RetryEnabled bool - MaxHeightRange uint - PreferredExecutionNodeIDs []string - FixedExecutionNodeIDs []string - Log zerolog.Logger - SnapshotHistoryLimit int - Communicator Communicator - TxResultCacheSize uint - ScriptExecutor execution.ScriptExecutor - ScriptExecutionMode IndexQueryMode - CheckPayerBalance bool - EventQueryMode IndexQueryMode - BlockTracker subscription.BlockTracker - SubscriptionHandler *subscription.SubscriptionHandler - - EventsIndex *index.EventsIndex - TxResultQueryMode IndexQueryMode - TxResultsIndex *index.TransactionResultsIndex - LastFullBlockHeight *counters.PersistentStrictMonotonicCounter - VersionControl *version.VersionControl + State protocol.State + CollectionRPC accessproto.AccessAPIClient + HistoricalAccessNodes []accessproto.AccessAPIClient + Blocks storage.Blocks + Headers storage.Headers + Collections storage.Collections + Transactions storage.Transactions + ExecutionReceipts storage.ExecutionReceipts + ExecutionResults storage.ExecutionResults + TxResultErrorMessages storage.TransactionResultErrorMessages + ChainID flow.ChainID + AccessMetrics module.AccessMetrics + ConnFactory connection.ConnectionFactory + RetryEnabled bool + MaxHeightRange uint + Log zerolog.Logger + SnapshotHistoryLimit int + Communicator Communicator + TxResultCacheSize uint + ScriptExecutor execution.ScriptExecutor + ScriptExecutionMode IndexQueryMode + CheckPayerBalance bool + EventQueryMode IndexQueryMode + BlockTracker subscription.BlockTracker + SubscriptionHandler *subscription.SubscriptionHandler + + EventsIndex *index.EventsIndex + TxResultQueryMode IndexQueryMode + TxResultsIndex *index.TransactionResultsIndex + LastFullBlockHeight *counters.PersistentStrictMonotonicCounter + VersionControl *version.VersionControl + ExecNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider } var _ TransactionErrorMessage = (*Backend)(nil) @@ -141,18 +140,6 @@ func New(params Params) (*Backend, error) { } systemTxID := systemTx.ID() - preferredENIdentifiers, err := commonrpc.IdentifierList(params.PreferredExecutionNodeIDs) - if err != nil { - return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err) - } - - fixedENIdentifiers, err := commonrpc.IdentifierList(params.FixedExecutionNodeIDs) - if err != nil { - return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err) - } - - execProvider := commonrpc.NewExecutionNodeIdentitiesProvider(params.Log, params.State, params.ExecutionReceipts, preferredENIdentifiers, fixedENIdentifiers) - transactionsLocalDataProvider := &TransactionsLocalDataProvider{ state: params.State, collections: params.Collections, @@ -177,7 +164,7 @@ func New(params Params) (*Backend, error) { nodeCommunicator: params.Communicator, scriptExecutor: params.ScriptExecutor, scriptExecMode: params.ScriptExecutionMode, - execProvider: execProvider, + execProvider: params.ExecNodeIdentitiesProvider, }, backendEvents: backendEvents{ log: params.Log, @@ -189,7 +176,7 @@ func New(params Params) (*Backend, error) { nodeCommunicator: params.Communicator, queryMode: params.EventQueryMode, eventsIndex: params.EventsIndex, - execProvider: execProvider, + execProvider: params.ExecNodeIdentitiesProvider, }, backendBlockHeaders: backendBlockHeaders{ headers: params.Headers, @@ -207,7 +194,7 @@ func New(params Params) (*Backend, error) { nodeCommunicator: params.Communicator, scriptExecutor: params.ScriptExecutor, scriptExecMode: params.ScriptExecutionMode, - execProvider: execProvider, + execProvider: params.ExecNodeIdentitiesProvider, }, backendExecutionResults: backendExecutionResults{ executionResults: params.ExecutionResults, @@ -257,7 +244,7 @@ func New(params Params) (*Backend, error) { txResultQueryMode: params.TxResultQueryMode, systemTx: systemTx, systemTxID: systemTxID, - execProvider: execProvider, + execProvider: params.ExecNodeIdentitiesProvider, } // TODO: The TransactionErrorMessage interface should be reorganized in future, as it is implemented in backendTransactions but used in TransactionsLocalDataProvider, and its initialization is somewhat quirky. diff --git a/engine/access/rpc/backend/backend_test.go b/engine/access/rpc/backend/backend_test.go index c78136632ba..53260fcc936 100644 --- a/engine/access/rpc/backend/backend_test.go +++ b/engine/access/rpc/backend/backend_test.go @@ -30,6 +30,7 @@ import ( backendmock "github.com/onflow/flow-go/engine/access/rpc/backend/mock" "github.com/onflow/flow-go/engine/access/rpc/connection" connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/engine/common/version" "github.com/onflow/flow-go/fvm/blueprints" @@ -89,6 +90,9 @@ type Suite struct { chainID flow.ChainID systemTx *flow.TransactionBody + + fixedExecutionNodeIDs flow.IdentifierList + preferredExecutionNodeIDs flow.IdentifierList } func TestHandler(t *testing.T) { @@ -921,10 +925,11 @@ func (suite *Suite) TestGetTransactionResultByIndex() { Events: nil, } + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = (fixedENIDs.NodeIDs()).Strings() backend, err := New(params) suite.Require().NoError(err) @@ -962,7 +967,6 @@ func (suite *Suite) TestGetTransactionResultsByBlockID() { suite.state.On("Sealed").Return(suite.snapshot, nil).Maybe() ctx := context.Background() - params := suite.defaultBackendParams() block := unittest.BlockFixture() sporkRootBlockHeight := suite.state.Params().SporkRootBlockHeight() @@ -986,9 +990,11 @@ func (suite *Suite) TestGetTransactionResultsByBlockID() { TransactionResults: []*execproto.GetTransactionResultResponse{{}}, } + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + + params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = (fixedENIDs.NodeIDs()).Strings() backend, err := New(params) suite.Require().NoError(err) @@ -1076,10 +1082,11 @@ func (suite *Suite) TestTransactionStatusTransition() { Events: nil, } + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = (fixedENIDs.NodeIDs()).Strings() backend, err := New(params) suite.Require().NoError(err) @@ -1332,7 +1339,7 @@ func (suite *Suite) TestTransactionPendingToFinalizedStatusTransition() { params := suite.defaultBackendParams() params.ConnFactory = connFactory params.MaxHeightRange = TEST_MAX_HEIGHT - params.PreferredExecutionNodeIDs = []string{receipts[0].ExecutorID.String()} + suite.preferredExecutionNodeIDs = flow.IdentifierList{receipts[0].ExecutorID} backend, err := New(params) suite.Require().NoError(err) @@ -1471,10 +1478,11 @@ func (suite *Suite) TestGetExecutionResultByID() { Return(executionResult, nil) suite.Run("nonexisting execution result for id", func() { + suite.fixedExecutionNodeIDs = validENIDs + params := suite.defaultBackendParams() params.ExecutionResults = results params.ConnFactory = connFactory - params.FixedExecutionNodeIDs = validENIDs.Strings() backend, err := New(params) suite.Require().NoError(err) @@ -1486,10 +1494,11 @@ func (suite *Suite) TestGetExecutionResultByID() { }) suite.Run("existing execution result id", func() { + suite.fixedExecutionNodeIDs = validENIDs + params := suite.defaultBackendParams() params.ExecutionResults = results params.ConnFactory = connFactory - params.FixedExecutionNodeIDs = validENIDs.Strings() backend, err := New(params) suite.Require().NoError(err) @@ -1533,10 +1542,11 @@ func (suite *Suite) TestGetExecutionResultByBlockID() { Return(executionResult, nil) suite.Run("nonexisting execution results", func() { + suite.fixedExecutionNodeIDs = validENIDs + params := suite.defaultBackendParams() params.ExecutionResults = results params.ConnFactory = connFactory - params.FixedExecutionNodeIDs = validENIDs.Strings() backend, err := New(params) suite.Require().NoError(err) @@ -1548,10 +1558,11 @@ func (suite *Suite) TestGetExecutionResultByBlockID() { }) suite.Run("existing execution results", func() { + suite.fixedExecutionNodeIDs = validENIDs + params := suite.defaultBackendParams() params.ExecutionResults = results params.ConnFactory = connFactory - params.FixedExecutionNodeIDs = validENIDs.Strings() backend, err := New(params) suite.Require().NoError(err) @@ -1762,10 +1773,11 @@ func (suite *Suite) TestGetTransactionResultEventEncodingVersion() { suite.state.On("Final").Return(suite.snapshot, nil).Maybe() suite.snapshot.On("Identities", mock.Anything).Return(fixedENIDs, nil) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = (fixedENIDs.NodeIDs()).Strings() backend, err := New(params) suite.Require().NoError(err) @@ -1824,10 +1836,11 @@ func (suite *Suite) TestGetTransactionResultByIndexAndBlockIdEventEncodingVersio suite.state.On("Final").Return(suite.snapshot, nil).Maybe() suite.snapshot.On("Identities", mock.Anything).Return(fixedENIDs, nil) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = (fixedENIDs.NodeIDs()).Strings() backend, err := New(params) suite.Require().NoError(err) @@ -1924,12 +1937,13 @@ func (suite *Suite) TestNodeCommunicator() { BlockId: blockId[:], } + // Left only one preferred execution node + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + suite.preferredExecutionNodeIDs = flow.IdentifierList{fixedENIDs[0].NodeID} + params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = (fixedENIDs.NodeIDs()).Strings() - // Left only one preferred execution node - params.PreferredExecutionNodeIDs = []string{fixedENIDs[0].NodeID.String()} backend, err := New(params) suite.Require().NoError(err) @@ -2022,5 +2036,12 @@ func (suite *Suite) defaultBackendParams() Params { TxResultQueryMode: IndexQueryModeExecutionNodesOnly, LastFullBlockHeight: suite.lastFullBlockHeight, VersionControl: suite.versionControl, + ExecNodeIdentitiesProvider: commonrpc.NewExecutionNodeIdentitiesProvider( + suite.log, + suite.state, + suite.receipts, + suite.preferredExecutionNodeIDs, + suite.fixedExecutionNodeIDs, + ), } } diff --git a/engine/access/rpc/backend/backend_transactions_test.go b/engine/access/rpc/backend/backend_transactions_test.go index 06ff0454a14..4153e18db47 100644 --- a/engine/access/rpc/backend/backend_transactions_test.go +++ b/engine/access/rpc/backend/backend_transactions_test.go @@ -355,6 +355,8 @@ func (suite *Suite) TestLookupTransactionErrorMessageByTransactionID_HappyPath() suite.state.On("Final").Return(suite.snapshot, nil).Maybe() suite.snapshot.On("Identities", mock.Anything).Return(fixedENIDs, nil) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() params.TxResultErrorMessages = suite.txErrorMessages @@ -362,7 +364,6 @@ func (suite *Suite) TestLookupTransactionErrorMessageByTransactionID_HappyPath() suite.Run("happy path from EN", func() { // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = fixedENIDs.NodeIDs().Strings() // Mock the cache lookup for the transaction error message, returning "not found". suite.txErrorMessages.On("ByBlockIDTransactionID", blockId, failedTxId). @@ -434,11 +435,12 @@ func (suite *Suite) TestLookupTransactionErrorMessageByTransactionID_FailedToFet reporter.On("LowestIndexedHeight").Return(block.Header.Height, nil) reporter.On("HighestIndexedHeight").Return(block.Header.Height+10, nil) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() // The connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() // Initialize the transaction results index with the mock reporter. - params.FixedExecutionNodeIDs = fixedENIDs.NodeIDs().Strings() params.TxResultsIndex = index.NewTransactionResultsIndex(index.NewReporter(), suite.transactionResults) err := params.TxResultsIndex.Initialize(reporter) suite.Require().NoError(err) @@ -550,6 +552,8 @@ func (suite *Suite) TestLookupTransactionErrorMessageByIndex_HappyPath() { suite.state.On("Final").Return(suite.snapshot, nil).Maybe() suite.snapshot.On("Identities", mock.Anything).Return(fixedENIDs, nil) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() params.TxResultErrorMessages = suite.txErrorMessages @@ -557,7 +561,6 @@ func (suite *Suite) TestLookupTransactionErrorMessageByIndex_HappyPath() { suite.Run("happy path from EN", func() { // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = fixedENIDs.NodeIDs().Strings() // Mock the cache lookup for the transaction error message, returning "not found". suite.txErrorMessages.On("ByBlockIDTransactionIndex", blockId, failedTxIndex). @@ -633,10 +636,11 @@ func (suite *Suite) TestLookupTransactionErrorMessageByIndex_FailedToFetch() { reporter.On("LowestIndexedHeight").Return(block.Header.Height, nil) reporter.On("HighestIndexedHeight").Return(block.Header.Height+10, nil) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = connFactory - params.FixedExecutionNodeIDs = fixedENIDs.NodeIDs().Strings() // Initialize the transaction results index with the mock reporter. params.TxResultsIndex = index.NewTransactionResultsIndex(index.NewReporter(), suite.transactionResults) err := params.TxResultsIndex.Initialize(reporter) @@ -750,6 +754,8 @@ func (suite *Suite) TestLookupTransactionErrorMessagesByBlockID_HappyPath() { suite.state.On("Final").Return(suite.snapshot, nil).Maybe() suite.snapshot.On("Identities", mock.Anything).Return(fixedENIDs, nil) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() params.TxResultErrorMessages = suite.txErrorMessages @@ -757,7 +763,6 @@ func (suite *Suite) TestLookupTransactionErrorMessagesByBlockID_HappyPath() { suite.Run("happy path from EN", func() { // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = fixedENIDs.NodeIDs().Strings() // Mock the cache lookup for the transaction error messages, returning "not found". suite.txErrorMessages.On("ByBlockID", blockId). @@ -853,10 +858,11 @@ func (suite *Suite) TestLookupTransactionErrorMessagesByBlockID_FailedToFetch() reporter.On("LowestIndexedHeight").Return(block.Header.Height, nil) reporter.On("HighestIndexedHeight").Return(block.Header.Height+10, nil) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = fixedENIDs.NodeIDs().Strings() // Initialize the transaction results index with the mock reporter. params.TxResultsIndex = index.NewTransactionResultsIndex(index.NewReporter(), suite.transactionResults) err := params.TxResultsIndex.Initialize(reporter) @@ -1349,13 +1355,13 @@ func (suite *Suite) TestTransactionResultFromStorage() { err := indexReporter.Initialize(reporter) suite.Require().NoError(err) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + // Set up the backend parameters and the backend instance params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = fixedENIDs.NodeIDs().Strings() params.TxResultQueryMode = IndexQueryModeLocalOnly - params.EventsIndex = index.NewEventsIndex(indexReporter, suite.events) params.TxResultsIndex = index.NewTransactionResultsIndex(indexReporter, suite.transactionResults) @@ -1436,13 +1442,13 @@ func (suite *Suite) TestTransactionByIndexFromStorage() { err := indexReporter.Initialize(reporter) suite.Require().NoError(err) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + // Set up the backend parameters and the backend instance params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = fixedENIDs.NodeIDs().Strings() params.TxResultQueryMode = IndexQueryModeLocalOnly - params.EventsIndex = index.NewEventsIndex(indexReporter, suite.events) params.TxResultsIndex = index.NewTransactionResultsIndex(indexReporter, suite.transactionResults) @@ -1529,15 +1535,14 @@ func (suite *Suite) TestTransactionResultsByBlockIDFromStorage() { err := indexReporter.Initialize(reporter) suite.Require().NoError(err) + suite.fixedExecutionNodeIDs = fixedENIDs.NodeIDs() + // Set up the state and snapshot mocks and the backend instance params := suite.defaultBackendParams() // the connection factory should be used to get the execution node client params.ConnFactory = suite.setupConnectionFactory() - params.FixedExecutionNodeIDs = fixedENIDs.NodeIDs().Strings() - params.EventsIndex = index.NewEventsIndex(indexReporter, suite.events) params.TxResultsIndex = index.NewTransactionResultsIndex(indexReporter, suite.transactionResults) - params.TxResultQueryMode = IndexQueryModeLocalOnly backend, err := New(params) From 69ea579a102312f92505721ab0649838fcc75e4b Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Wed, 25 Sep 2024 22:51:40 +0300 Subject: [PATCH 3/6] Renamed field to execNodeIdentitiesProvider --- engine/access/ingestion/engine.go | 20 +++---- engine/access/rpc/backend/backend.go | 58 +++++++++---------- engine/access/rpc/backend/backend_accounts.go | 18 +++--- .../rpc/backend/backend_accounts_test.go | 2 +- engine/access/rpc/backend/backend_events.go | 22 +++---- .../access/rpc/backend/backend_events_test.go | 2 +- engine/access/rpc/backend/backend_scripts.go | 22 +++---- .../rpc/backend/backend_scripts_test.go | 2 +- .../rpc/backend/backend_transactions.go | 18 +++--- ...execution_node_identities_provider_test.go | 12 ++-- 10 files changed, 88 insertions(+), 88 deletions(-) diff --git a/engine/access/ingestion/engine.go b/engine/access/ingestion/engine.go index 4e9a598cc30..2f94d2becf7 100644 --- a/engine/access/ingestion/engine.go +++ b/engine/access/ingestion/engine.go @@ -108,8 +108,8 @@ type Engine struct { // metrics collectionExecutedMetric module.CollectionExecutedMetric - execProvider *commonrpc.ExecutionNodeIdentitiesProvider - backend *backend.Backend + execNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider + backend *backend.Backend } var _ network.MessageProcessor = (*Engine)(nil) @@ -134,7 +134,7 @@ func New( processedHeight storage.ConsumerProgress, lastFullBlockHeight *counters.PersistentStrictMonotonicCounter, backend *backend.Backend, - execProvider *commonrpc.ExecutionNodeIdentitiesProvider, + execNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider, ) (*Engine, error) { executionReceiptsRawQueue, err := fifoqueue.NewFifoQueue(defaultQueueCapacity) if err != nil { @@ -176,12 +176,12 @@ func New( lastFullBlockHeight: lastFullBlockHeight, // queue / notifier for execution receipts - executionReceiptsNotifier: engine.NewNotifier(), - txResultErrorMessagesChan: make(chan flow.Identifier, 1), - executionReceiptsQueue: executionReceiptsQueue, - messageHandler: messageHandler, - backend: backend, - execProvider: execProvider, + executionReceiptsNotifier: engine.NewNotifier(), + txResultErrorMessagesChan: make(chan flow.Identifier, 1), + executionReceiptsQueue: executionReceiptsQueue, + messageHandler: messageHandler, + backend: backend, + execNodeIdentitiesProvider: execNodeIdentitiesProvider, } // jobqueue Jobs object that tracks finalized blocks by height. This is used by the finalizedBlockConsumer @@ -407,7 +407,7 @@ func (e *Engine) handleTransactionResultErrorMessages(ctx context.Context, block // retrieves error messages from the backend if they do not already exist in storage if !exists { - execNodes, err := e.execProvider.ExecutionNodesForBlockID( + execNodes, err := e.execNodeIdentitiesProvider.ExecutionNodesForBlockID( ctx, blockID, ) diff --git a/engine/access/rpc/backend/backend.go b/engine/access/rpc/backend/backend.go index 83e1f8731e9..28b9ef03740 100644 --- a/engine/access/rpc/backend/backend.go +++ b/engine/access/rpc/backend/backend.go @@ -155,28 +155,28 @@ func New(params Params) (*Backend, error) { BlockTracker: params.BlockTracker, // create the sub-backends backendScripts: backendScripts{ - log: params.Log, - headers: params.Headers, - connFactory: params.ConnFactory, - state: params.State, - metrics: params.AccessMetrics, - loggedScripts: loggedScripts, - nodeCommunicator: params.Communicator, - scriptExecutor: params.ScriptExecutor, - scriptExecMode: params.ScriptExecutionMode, - execProvider: params.ExecNodeIdentitiesProvider, + log: params.Log, + headers: params.Headers, + connFactory: params.ConnFactory, + state: params.State, + metrics: params.AccessMetrics, + loggedScripts: loggedScripts, + nodeCommunicator: params.Communicator, + scriptExecutor: params.ScriptExecutor, + scriptExecMode: params.ScriptExecutionMode, + execNodeIdentitiesProvider: params.ExecNodeIdentitiesProvider, }, backendEvents: backendEvents{ - log: params.Log, - chain: params.ChainID.Chain(), - state: params.State, - headers: params.Headers, - connFactory: params.ConnFactory, - maxHeightRange: params.MaxHeightRange, - nodeCommunicator: params.Communicator, - queryMode: params.EventQueryMode, - eventsIndex: params.EventsIndex, - execProvider: params.ExecNodeIdentitiesProvider, + log: params.Log, + chain: params.ChainID.Chain(), + state: params.State, + headers: params.Headers, + connFactory: params.ConnFactory, + maxHeightRange: params.MaxHeightRange, + nodeCommunicator: params.Communicator, + queryMode: params.EventQueryMode, + eventsIndex: params.EventsIndex, + execNodeIdentitiesProvider: params.ExecNodeIdentitiesProvider, }, backendBlockHeaders: backendBlockHeaders{ headers: params.Headers, @@ -187,14 +187,14 @@ func New(params Params) (*Backend, error) { state: params.State, }, backendAccounts: backendAccounts{ - log: params.Log, - state: params.State, - headers: params.Headers, - connFactory: params.ConnFactory, - nodeCommunicator: params.Communicator, - scriptExecutor: params.ScriptExecutor, - scriptExecMode: params.ScriptExecutionMode, - execProvider: params.ExecNodeIdentitiesProvider, + log: params.Log, + state: params.State, + headers: params.Headers, + connFactory: params.ConnFactory, + nodeCommunicator: params.Communicator, + scriptExecutor: params.ScriptExecutor, + scriptExecMode: params.ScriptExecutionMode, + execNodeIdentitiesProvider: params.ExecNodeIdentitiesProvider, }, backendExecutionResults: backendExecutionResults{ executionResults: params.ExecutionResults, @@ -244,7 +244,7 @@ func New(params Params) (*Backend, error) { txResultQueryMode: params.TxResultQueryMode, systemTx: systemTx, systemTxID: systemTxID, - execProvider: params.ExecNodeIdentitiesProvider, + execNodeIdentitiesProvider: params.ExecNodeIdentitiesProvider, } // TODO: The TransactionErrorMessage interface should be reorganized in future, as it is implemented in backendTransactions but used in TransactionsLocalDataProvider, and its initialization is somewhat quirky. diff --git a/engine/access/rpc/backend/backend_accounts.go b/engine/access/rpc/backend/backend_accounts.go index a104d8e2d58..ca52455f7ba 100644 --- a/engine/access/rpc/backend/backend_accounts.go +++ b/engine/access/rpc/backend/backend_accounts.go @@ -25,14 +25,14 @@ import ( ) type backendAccounts struct { - log zerolog.Logger - state protocol.State - headers storage.Headers - connFactory connection.ConnectionFactory - nodeCommunicator Communicator - scriptExecutor execution.ScriptExecutor - scriptExecMode IndexQueryMode - execProvider *commonrpc.ExecutionNodeIdentitiesProvider + log zerolog.Logger + state protocol.State + headers storage.Headers + connFactory connection.ConnectionFactory + nodeCommunicator Communicator + scriptExecutor execution.ScriptExecutor + scriptExecMode IndexQueryMode + execNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider } // GetAccount returns the account details at the latest sealed block. @@ -420,7 +420,7 @@ func (b *backendAccounts) getAccountFromAnyExeNode( BlockId: blockID[:], } - execNodes, err := b.execProvider.ExecutionNodesForBlockID( + execNodes, err := b.execNodeIdentitiesProvider.ExecutionNodesForBlockID( ctx, blockID, ) diff --git a/engine/access/rpc/backend/backend_accounts_test.go b/engine/access/rpc/backend/backend_accounts_test.go index 709ca93e132..7882c4bb020 100644 --- a/engine/access/rpc/backend/backend_accounts_test.go +++ b/engine/access/rpc/backend/backend_accounts_test.go @@ -83,7 +83,7 @@ func (s *BackendAccountsSuite) defaultBackend() *backendAccounts { headers: s.headers, connFactory: s.connectionFactory, nodeCommunicator: NewNodeCommunicator(false), - execProvider: commonrpc.NewExecutionNodeIdentitiesProvider( + execNodeIdentitiesProvider: commonrpc.NewExecutionNodeIdentitiesProvider( s.log, s.state, s.receipts, diff --git a/engine/access/rpc/backend/backend_events.go b/engine/access/rpc/backend/backend_events.go index 2d671bb6689..c3b872b0ad1 100644 --- a/engine/access/rpc/backend/backend_events.go +++ b/engine/access/rpc/backend/backend_events.go @@ -28,16 +28,16 @@ import ( ) type backendEvents struct { - headers storage.Headers - state protocol.State - chain flow.Chain - connFactory connection.ConnectionFactory - log zerolog.Logger - maxHeightRange uint - nodeCommunicator Communicator - queryMode IndexQueryMode - eventsIndex *index.EventsIndex - execProvider *rpc.ExecutionNodeIdentitiesProvider + headers storage.Headers + state protocol.State + chain flow.Chain + connFactory connection.ConnectionFactory + log zerolog.Logger + maxHeightRange uint + nodeCommunicator Communicator + queryMode IndexQueryMode + eventsIndex *index.EventsIndex + execNodeIdentitiesProvider *rpc.ExecutionNodeIdentitiesProvider } // blockMetadata is used to capture information about requested blocks to avoid repeated blockID @@ -303,7 +303,7 @@ func (b *backendEvents) getBlockEventsFromExecutionNode( // choose the last block ID to find the list of execution nodes lastBlockID := blockIDs[len(blockIDs)-1] - execNodes, err := b.execProvider.ExecutionNodesForBlockID( + execNodes, err := b.execNodeIdentitiesProvider.ExecutionNodesForBlockID( ctx, lastBlockID, ) diff --git a/engine/access/rpc/backend/backend_events_test.go b/engine/access/rpc/backend/backend_events_test.go index 08c94693a11..fae566baa9e 100644 --- a/engine/access/rpc/backend/backend_events_test.go +++ b/engine/access/rpc/backend/backend_events_test.go @@ -185,7 +185,7 @@ func (s *BackendEventsSuite) defaultBackend() *backendEvents { maxHeightRange: DefaultMaxHeightRange, queryMode: IndexQueryModeExecutionNodesOnly, eventsIndex: s.eventsIndex, - execProvider: commonrpc.NewExecutionNodeIdentitiesProvider( + execNodeIdentitiesProvider: commonrpc.NewExecutionNodeIdentitiesProvider( s.log, s.state, s.receipts, diff --git a/engine/access/rpc/backend/backend_scripts.go b/engine/access/rpc/backend/backend_scripts.go index 3ad1e48b79b..bea33fb5181 100644 --- a/engine/access/rpc/backend/backend_scripts.go +++ b/engine/access/rpc/backend/backend_scripts.go @@ -28,16 +28,16 @@ import ( const uniqueScriptLoggingTimeWindow = 10 * time.Minute type backendScripts struct { - log zerolog.Logger - headers storage.Headers - state protocol.State - connFactory connection.ConnectionFactory - metrics module.BackendScriptsMetrics - loggedScripts *lru.Cache[[md5.Size]byte, time.Time] - nodeCommunicator Communicator - scriptExecutor execution.ScriptExecutor - scriptExecMode IndexQueryMode - execProvider *commonrpc.ExecutionNodeIdentitiesProvider + log zerolog.Logger + headers storage.Headers + state protocol.State + connFactory connection.ConnectionFactory + metrics module.BackendScriptsMetrics + loggedScripts *lru.Cache[[md5.Size]byte, time.Time] + nodeCommunicator Communicator + scriptExecutor execution.ScriptExecutor + scriptExecMode IndexQueryMode + execNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider } // scriptExecutionRequest encapsulates the data needed to execute a script to make it easier @@ -225,7 +225,7 @@ func (b *backendScripts) executeScriptOnAvailableExecutionNodes( r *scriptExecutionRequest, ) ([]byte, time.Duration, error) { // find few execution nodes which have executed the block earlier and provided an execution receipt for it - executors, err := b.execProvider.ExecutionNodesForBlockID(ctx, r.blockID) + executors, err := b.execNodeIdentitiesProvider.ExecutionNodesForBlockID(ctx, r.blockID) if err != nil { return nil, 0, status.Errorf(codes.Internal, "failed to find script executors at blockId %v: %v", r.blockID.String(), err) } diff --git a/engine/access/rpc/backend/backend_scripts_test.go b/engine/access/rpc/backend/backend_scripts_test.go index 6365b4624c3..e4b6ccbee7b 100644 --- a/engine/access/rpc/backend/backend_scripts_test.go +++ b/engine/access/rpc/backend/backend_scripts_test.go @@ -104,7 +104,7 @@ func (s *BackendScriptsSuite) defaultBackend() *backendScripts { loggedScripts: loggedScripts, connFactory: s.connectionFactory, nodeCommunicator: NewNodeCommunicator(false), - execProvider: commonrpc.NewExecutionNodeIdentitiesProvider( + execNodeIdentitiesProvider: commonrpc.NewExecutionNodeIdentitiesProvider( s.log, s.state, s.receipts, diff --git a/engine/access/rpc/backend/backend_transactions.go b/engine/access/rpc/backend/backend_transactions.go index 6f5e37a88e6..408dd90d221 100644 --- a/engine/access/rpc/backend/backend_transactions.go +++ b/engine/access/rpc/backend/backend_transactions.go @@ -48,9 +48,9 @@ type backendTransactions struct { txResultCache *lru.Cache[flow.Identifier, *access.TransactionResult] txResultQueryMode IndexQueryMode - systemTxID flow.Identifier - systemTx *flow.TransactionBody - execProvider *commonrpc.ExecutionNodeIdentitiesProvider + systemTxID flow.Identifier + systemTx *flow.TransactionBody + execNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider } var _ TransactionErrorMessage = (*backendTransactions)(nil) @@ -411,7 +411,7 @@ func (b *backendTransactions) getTransactionResultsByBlockIDFromExecutionNode( BlockId: blockID[:], } - execNodes, err := b.execProvider.ExecutionNodesForBlockID( + execNodes, err := b.execNodeIdentitiesProvider.ExecutionNodesForBlockID( ctx, blockID, ) @@ -568,7 +568,7 @@ func (b *backendTransactions) getTransactionResultByIndexFromExecutionNode( Index: index, } - execNodes, err := b.execProvider.ExecutionNodesForBlockID( + execNodes, err := b.execNodeIdentitiesProvider.ExecutionNodesForBlockID( ctx, blockID, ) @@ -755,7 +755,7 @@ func (b *backendTransactions) getTransactionResultFromExecutionNode( TransactionId: transactionID[:], } - execNodes, err := b.execProvider.ExecutionNodesForBlockID( + execNodes, err := b.execNodeIdentitiesProvider.ExecutionNodesForBlockID( ctx, blockID, ) @@ -995,7 +995,7 @@ func (b *backendTransactions) LookupErrorMessageByTransactionID( } } - execNodes, err := b.execProvider.ExecutionNodesForBlockID( + execNodes, err := b.execNodeIdentitiesProvider.ExecutionNodesForBlockID( ctx, blockID, ) @@ -1069,7 +1069,7 @@ func (b *backendTransactions) LookupErrorMessageByIndex( } } - execNodes, err := b.execProvider.ExecutionNodesForBlockID( + execNodes, err := b.execNodeIdentitiesProvider.ExecutionNodesForBlockID( ctx, blockID, ) @@ -1141,7 +1141,7 @@ func (b *backendTransactions) LookupErrorMessagesByBlockID( } } - execNodes, err := b.execProvider.ExecutionNodesForBlockID( + execNodes, err := b.execNodeIdentitiesProvider.ExecutionNodesForBlockID( ctx, blockID, ) diff --git a/engine/common/rpc/execution_node_identities_provider_test.go b/engine/common/rpc/execution_node_identities_provider_test.go index 75171de4d20..638b683b1cc 100644 --- a/engine/common/rpc/execution_node_identities_provider_test.go +++ b/engine/common/rpc/execution_node_identities_provider_test.go @@ -173,7 +173,7 @@ func (suite *Suite) TestExecutionNodesForBlockID() { expectedENs = flow.IdentityList{} } - execProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider( suite.log, suite.state, suite.receipts, @@ -181,7 +181,7 @@ func (suite *Suite) TestExecutionNodesForBlockID() { fixedENIdentifiers, ) - allExecNodes, err := execProvider.ExecutionNodesForBlockID(context.Background(), block.ID()) + allExecNodes, err := execNodeIdentitiesProvider.ExecutionNodesForBlockID(context.Background(), block.ID()) require.NoError(suite.T(), err) execNodeSelectorFactory := backend.NewNodeSelectorFactory(false) @@ -212,7 +212,7 @@ func (suite *Suite) TestExecutionNodesForBlockID() { attempt3Receipts = flow.ExecutionReceiptList{} suite.state.On("AtBlockID", mock.Anything).Return(suite.snapshot) - execProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider( suite.log, suite.state, suite.receipts, @@ -220,7 +220,7 @@ func (suite *Suite) TestExecutionNodesForBlockID() { flow.IdentifierList{}, ) - allExecNodes, err := execProvider.ExecutionNodesForBlockID(context.Background(), block.ID()) + allExecNodes, err := execNodeIdentitiesProvider.ExecutionNodesForBlockID(context.Background(), block.ID()) require.NoError(suite.T(), err) execNodeSelectorFactory := backend.NewNodeSelectorFactory(false) @@ -309,7 +309,7 @@ func (suite *Suite) TestExecutionNodesForBlockID() { additionalNode[0], } - execProvider := commonrpc.NewExecutionNodeIdentitiesProvider( + execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider( suite.log, suite.state, suite.receipts, @@ -317,7 +317,7 @@ func (suite *Suite) TestExecutionNodesForBlockID() { flow.IdentifierList{}, ) - chosenIDs := execProvider.ChooseFromPreferredENIDs(allExecutionNodes, executorIDs) + chosenIDs := execNodeIdentitiesProvider.ChooseFromPreferredENIDs(allExecutionNodes, executorIDs) require.ElementsMatch(suite.T(), chosenIDs, expectedOrder) require.Equal(suite.T(), len(chosenIDs), commonrpc.MaxNodesCnt) From 010492ecfaf7c3325ea346b28673dd763681253c Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Mon, 30 Sep 2024 16:59:44 +0300 Subject: [PATCH 4/6] Removed unused storages in test --- ...execution_node_identities_provider_test.go | 68 +------------------ 1 file changed, 3 insertions(+), 65 deletions(-) diff --git a/engine/common/rpc/execution_node_identities_provider_test.go b/engine/common/rpc/execution_node_identities_provider_test.go index 638b683b1cc..eb23faf34a6 100644 --- a/engine/common/rpc/execution_node_identities_provider_test.go +++ b/engine/common/rpc/execution_node_identities_provider_test.go @@ -4,24 +4,15 @@ import ( "context" "testing" - "github.com/dgraph-io/badger/v2" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - accessmock "github.com/onflow/flow-go/engine/access/mock" "github.com/onflow/flow-go/engine/access/rpc/backend" - backendmock "github.com/onflow/flow-go/engine/access/rpc/backend/mock" - connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" commonrpc "github.com/onflow/flow-go/engine/common/rpc" - "github.com/onflow/flow-go/engine/common/version" - "github.com/onflow/flow-go/fvm/blueprints" "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/module" - "github.com/onflow/flow-go/module/counters" protocol "github.com/onflow/flow-go/state/protocol/mock" - bstorage "github.com/onflow/flow-go/storage/badger" storagemock "github.com/onflow/flow-go/storage/mock" "github.com/onflow/flow-go/utils/unittest" ) @@ -33,30 +24,7 @@ type Suite struct { snapshot *protocol.Snapshot log zerolog.Logger - blocks *storagemock.Blocks - headers *storagemock.Headers - collections *storagemock.Collections - transactions *storagemock.Transactions - receipts *storagemock.ExecutionReceipts - results *storagemock.ExecutionResults - transactionResults *storagemock.LightTransactionResults - events *storagemock.Events - txErrorMessages *storagemock.TransactionResultErrorMessages - - db *badger.DB - dbDir string - lastFullBlockHeight *counters.PersistentStrictMonotonicCounter - versionControl *version.VersionControl - - colClient *accessmock.AccessAPIClient - execClient *accessmock.ExecutionAPIClient - historicalAccessClient *accessmock.AccessAPIClient - - connectionFactory *connectionmock.ConnectionFactory - communicator *backendmock.Communicator - - chainID flow.ChainID - systemTx *flow.TransactionBody + receipts *storagemock.ExecutionReceipts } func TestHandler(t *testing.T) { @@ -67,47 +35,17 @@ func (suite *Suite) SetupTest() { suite.log = zerolog.New(zerolog.NewConsoleWriter()) suite.state = new(protocol.State) suite.snapshot = new(protocol.Snapshot) + suite.receipts = new(storagemock.ExecutionReceipts) + header := unittest.BlockHeaderFixture() params := new(protocol.Params) params.On("FinalizedRoot").Return(header, nil) - params.On("SporkID").Return(unittest.IdentifierFixture(), nil) - params.On("ProtocolVersion").Return(uint(unittest.Uint64InRange(10, 30)), nil) - params.On("SporkRootBlockHeight").Return(header.Height, nil) - params.On("SealedRoot").Return(header, nil) suite.state.On("Params").Return(params) - - suite.blocks = new(storagemock.Blocks) - suite.headers = new(storagemock.Headers) - suite.transactions = new(storagemock.Transactions) - suite.collections = new(storagemock.Collections) - suite.receipts = new(storagemock.ExecutionReceipts) - suite.results = new(storagemock.ExecutionResults) - suite.colClient = new(accessmock.AccessAPIClient) - suite.execClient = new(accessmock.ExecutionAPIClient) - suite.transactionResults = storagemock.NewLightTransactionResults(suite.T()) - suite.events = storagemock.NewEvents(suite.T()) - suite.chainID = flow.Testnet - suite.historicalAccessClient = new(accessmock.AccessAPIClient) - suite.connectionFactory = connectionmock.NewConnectionFactory(suite.T()) - - suite.communicator = new(backendmock.Communicator) - - var err error - suite.systemTx, err = blueprints.SystemChunkTransaction(flow.Testnet.Chain()) - suite.Require().NoError(err) - - suite.db, suite.dbDir = unittest.TempBadgerDB(suite.T()) - suite.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(suite.db, module.ConsumeProgressLastFullBlockHeight), - 0, - ) - suite.Require().NoError(err) } // TestExecutionNodesForBlockID tests the ExecutionNodesForBlockID used for serving all API calls // that need to talk to an execution node. func (suite *Suite) TestExecutionNodesForBlockID() { - totalReceipts := 5 block := unittest.BlockFixture() From cce8ebdc063a1274cddd05e51d2052e6795994ba Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Tue, 1 Oct 2024 16:19:13 +0300 Subject: [PATCH 5/6] Added testing db Exists --- .../transaction_result_error_messages_test.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/storage/badger/transaction_result_error_messages_test.go b/storage/badger/transaction_result_error_messages_test.go index 9ba95c1e998..e0513516a79 100644 --- a/storage/badger/transaction_result_error_messages_test.go +++ b/storage/badger/transaction_result_error_messages_test.go @@ -23,6 +23,12 @@ func TestStoringTransactionResultErrorMessages(t *testing.T) { store := bstorage.NewTransactionResultErrorMessages(metrics, db, 1000) blockID := unittest.IdentifierFixture() + + // test db Exists by block id + exists, err := store.Exists(blockID) + require.NoError(t, err) + require.False(t, exists) + txErrorMessages := make([]flow.TransactionResultErrorMessage, 0) for i := 0; i < 10; i++ { expected := flow.TransactionResultErrorMessage{ @@ -33,8 +39,13 @@ func TestStoringTransactionResultErrorMessages(t *testing.T) { } txErrorMessages = append(txErrorMessages, expected) } - err := store.Store(blockID, txErrorMessages) + err = store.Store(blockID, txErrorMessages) + require.NoError(t, err) + + // test db Exists by block id + exists, err = store.Exists(blockID) require.NoError(t, err) + require.True(t, exists) // check retrieving by ByBlockIDTransactionID for _, txErrorMessage := range txErrorMessages { From 54ffb077f4548e4c36bb4b8c6f27c145cc8950c4 Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Thu, 24 Oct 2024 12:46:16 +0300 Subject: [PATCH 6/6] Updated godoc and tests --- .../tx_error_messages_core_test.go | 8 +------- .../rpc/execution_node_identities_provider.go | 2 -- .../execution_node_identities_provider_test.go | 15 +++++++++------ 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/engine/access/ingestion/tx_error_messages/tx_error_messages_core_test.go b/engine/access/ingestion/tx_error_messages/tx_error_messages_core_test.go index acb21c7e484..aa8a51b42f8 100644 --- a/engine/access/ingestion/tx_error_messages/tx_error_messages_core_test.go +++ b/engine/access/ingestion/tx_error_messages/tx_error_messages_core_test.go @@ -259,13 +259,7 @@ func (s *TxErrorMessagesCoreSuite) initCore() *TxErrorMessagesCore { s.log, backend, s.txErrorMessages, - commonrpc.NewExecutionNodeIdentitiesProvider( - s.log, - s.proto.state, - s.receipts, - flow.IdentifierList{}, - flow.IdentifierList{}, - ), + execNodeIdentitiesProvider, ) return core } diff --git a/engine/common/rpc/execution_node_identities_provider.go b/engine/common/rpc/execution_node_identities_provider.go index 0a56c53a08a..75a1f0b33bf 100644 --- a/engine/common/rpc/execution_node_identities_provider.go +++ b/engine/common/rpc/execution_node_identities_provider.go @@ -58,8 +58,6 @@ type ExecutionNodeIdentitiesProvider struct { // are prioritized during selection. // - fixedENIdentifiers: A flow.IdentifierList of fixed execution node identifiers that are // always considered if available. -// -// No error returns are expected during normal operations. func NewExecutionNodeIdentitiesProvider( log zerolog.Logger, state protocol.State, diff --git a/engine/common/rpc/execution_node_identities_provider_test.go b/engine/common/rpc/execution_node_identities_provider_test.go index eb23faf34a6..2b033e3dac0 100644 --- a/engine/common/rpc/execution_node_identities_provider_test.go +++ b/engine/common/rpc/execution_node_identities_provider_test.go @@ -17,7 +17,8 @@ import ( "github.com/onflow/flow-go/utils/unittest" ) -type Suite struct { +// ENIdentitiesProviderSuite is a test suite for testing the ExecutionNodeIdentitiesProvider. +type ENIdentitiesProviderSuite struct { suite.Suite state *protocol.State @@ -28,10 +29,11 @@ type Suite struct { } func TestHandler(t *testing.T) { - suite.Run(t, new(Suite)) + suite.Run(t, new(ENIdentitiesProviderSuite)) } -func (suite *Suite) SetupTest() { +// SetupTest initializes the test suite with mock state and receipts storage. +func (suite *ENIdentitiesProviderSuite) SetupTest() { suite.log = zerolog.New(zerolog.NewConsoleWriter()) suite.state = new(protocol.State) suite.snapshot = new(protocol.Snapshot) @@ -43,9 +45,10 @@ func (suite *Suite) SetupTest() { suite.state.On("Params").Return(params) } -// TestExecutionNodesForBlockID tests the ExecutionNodesForBlockID used for serving all API calls -// that need to talk to an execution node. -func (suite *Suite) TestExecutionNodesForBlockID() { +// TestExecutionNodesForBlockID tests the ExecutionNodesForBlockID function. +// This function is responsible for retrieving execution nodes used to serve +// all API calls that interact with execution nodes. +func (suite *ENIdentitiesProviderSuite) TestExecutionNodesForBlockID() { totalReceipts := 5 block := unittest.BlockFixture()