Skip to content

Commit

Permalink
Merge pull request #6499 from The-K-R-O-K/UlyanaAndrukhiv/6497-refact…
Browse files Browse the repository at this point in the history
…or-executionNodesForBlockID

[Access] Refactor executionNodesForBlockID by encapsulating its parameters into a struct
  • Loading branch information
peterargue authored Nov 1, 2024
2 parents b5fab5c + 54ffb07 commit 0fba2ae
Show file tree
Hide file tree
Showing 19 changed files with 760 additions and 611 deletions.
99 changes: 52 additions & 47 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ type FlowAccessNodeBuilder struct {
stateStreamBackend *statestreambackend.StateStreamBackend
nodeBackend *backend.Backend

TxResultErrorMessagesCore *tx_error_messages.TxErrorMessagesCore
ExecNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider
TxResultErrorMessagesCore *tx_error_messages.TxErrorMessagesCore
}

func (builder *FlowAccessNodeBuilder) buildFollowerState() *FlowAccessNodeBuilder {
Expand Down Expand Up @@ -1970,46 +1971,63 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {

}

preferredENIdentifiers, err := commonrpc.IdentifierList(backendConfig.PreferredExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err)
}

fixedENIdentifiers, err := commonrpc.IdentifierList(backendConfig.FixedExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err)
}

builder.ExecNodeIdentitiesProvider = commonrpc.NewExecutionNodeIdentitiesProvider(
node.Logger,
node.State,
node.Storage.Receipts,
preferredENIdentifiers,
fixedENIdentifiers,
)

builder.nodeBackend, err = backend.New(backend.Params{
State: node.State,
CollectionRPC: builder.CollectionRPC,
HistoricalAccessNodes: builder.HistoricalAccessRPCs,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
TxResultErrorMessages: node.Storage.TransactionResultErrorMessages,
ChainID: node.RootChainID,
AccessMetrics: builder.AccessMetrics,
ConnFactory: connFactory,
RetryEnabled: builder.retryEnabled,
MaxHeightRange: backendConfig.MaxHeightRange,
PreferredExecutionNodeIDs: backendConfig.PreferredExecutionNodeIDs,
FixedExecutionNodeIDs: backendConfig.FixedExecutionNodeIDs,
Log: node.Logger,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled),
TxResultCacheSize: builder.TxResultCacheSize,
ScriptExecutor: builder.ScriptExecutor,
ScriptExecutionMode: scriptExecMode,
CheckPayerBalanceMode: checkPayerBalanceMode,
EventQueryMode: eventQueryMode,
BlockTracker: blockTracker,
State: node.State,
CollectionRPC: builder.CollectionRPC,
HistoricalAccessNodes: builder.HistoricalAccessRPCs,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
TxResultErrorMessages: node.Storage.TransactionResultErrorMessages,
ChainID: node.RootChainID,
AccessMetrics: builder.AccessMetrics,
ConnFactory: connFactory,
RetryEnabled: builder.retryEnabled,
MaxHeightRange: backendConfig.MaxHeightRange,
Log: node.Logger,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled),
TxResultCacheSize: builder.TxResultCacheSize,
ScriptExecutor: builder.ScriptExecutor,
ScriptExecutionMode: scriptExecMode,
CheckPayerBalanceMode: checkPayerBalanceMode,
EventQueryMode: eventQueryMode,
BlockTracker: blockTracker,
SubscriptionHandler: subscription.NewSubscriptionHandler(
builder.Logger,
broadcaster,
builder.stateStreamConf.ClientSendTimeout,
builder.stateStreamConf.ResponseLimit,
builder.stateStreamConf.ClientSendBufferSize,
),
EventsIndex: builder.EventsIndex,
TxResultQueryMode: txResultQueryMode,
TxResultsIndex: builder.TxResultsIndex,
LastFullBlockHeight: lastFullBlockHeight,
IndexReporter: indexReporter,
VersionControl: builder.VersionControl,
EventsIndex: builder.EventsIndex,
TxResultQueryMode: txResultQueryMode,
TxResultsIndex: builder.TxResultsIndex,
LastFullBlockHeight: lastFullBlockHeight,
IndexReporter: indexReporter,
VersionControl: builder.VersionControl,
ExecNodeIdentitiesProvider: builder.ExecNodeIdentitiesProvider,
})
if err != nil {
return nil, fmt.Errorf("could not initialize backend: %w", err)
Expand Down Expand Up @@ -2063,25 +2081,12 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil, fmt.Errorf("could not create requester engine: %w", err)
}

preferredENIdentifiers, err := commonrpc.IdentifierList(builder.rpcConf.BackendConfig.PreferredExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err)
}

fixedENIdentifiers, err := commonrpc.IdentifierList(builder.rpcConf.BackendConfig.FixedExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err)
}

if builder.storeTxResultErrorMessages {
builder.TxResultErrorMessagesCore = tx_error_messages.NewTxErrorMessagesCore(
node.Logger,
node.State,
builder.nodeBackend,
node.Storage.Receipts,
node.Storage.TransactionResultErrorMessages,
preferredENIdentifiers,
fixedENIdentifiers,
builder.ExecNodeIdentitiesProvider,
)
}

Expand Down
58 changes: 38 additions & 20 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/common/follower"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/engine/common/stop"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/common/version"
Expand Down Expand Up @@ -1869,34 +1870,51 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
indexReporter = builder.Reporter
}

preferredENIdentifiers, err := commonrpc.IdentifierList(backendConfig.PreferredExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for preferred EN map: %w", err)
}

fixedENIdentifiers, err := commonrpc.IdentifierList(backendConfig.FixedExecutionNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to convert node id string to Flow Identifier for fixed EN map: %w", err)
}

execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider(
node.Logger,
node.State,
node.Storage.Receipts,
preferredENIdentifiers,
fixedENIdentifiers,
)

backendParams := backend.Params{
State: node.State,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
ChainID: node.RootChainID,
AccessMetrics: accessMetrics,
ConnFactory: connFactory,
RetryEnabled: false,
MaxHeightRange: backendConfig.MaxHeightRange,
PreferredExecutionNodeIDs: backendConfig.PreferredExecutionNodeIDs,
FixedExecutionNodeIDs: backendConfig.FixedExecutionNodeIDs,
Log: node.Logger,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled),
BlockTracker: blockTracker,
State: node.State,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
ExecutionResults: node.Storage.Results,
ChainID: node.RootChainID,
AccessMetrics: accessMetrics,
ConnFactory: connFactory,
RetryEnabled: false,
MaxHeightRange: backendConfig.MaxHeightRange,
Log: node.Logger,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(backendConfig.CircuitBreakerConfig.Enabled),
BlockTracker: blockTracker,
SubscriptionHandler: subscription.NewSubscriptionHandler(
builder.Logger,
broadcaster,
builder.stateStreamConf.ClientSendTimeout,
builder.stateStreamConf.ResponseLimit,
builder.stateStreamConf.ClientSendBufferSize,
),
IndexReporter: indexReporter,
VersionControl: builder.VersionControl,
IndexReporter: indexReporter,
VersionControl: builder.VersionControl,
ExecNodeIdentitiesProvider: execNodeIdentitiesProvider,
}

if builder.localServiceAPIEnabled {
Expand Down
128 changes: 77 additions & 51 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/onflow/flow-go/engine/access/rpc/backend"
connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock"
"github.com/onflow/flow-go/engine/access/subscription"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/factory"
Expand Down Expand Up @@ -644,23 +645,32 @@ func (suite *Suite) TestGetSealedTransaction() {
blockTransactions, err := stdmap.NewIdentifierMap(100)
require.NoError(suite.T(), err)

bnd, err := backend.New(backend.Params{State: suite.state,
CollectionRPC: suite.collClient,
Blocks: all.Blocks,
Headers: all.Headers,
Collections: collections,
Transactions: transactions,
ExecutionReceipts: receipts,
ExecutionResults: results,
ChainID: suite.chainID,
AccessMetrics: suite.metrics,
ConnFactory: connFactory,
MaxHeightRange: backend.DefaultMaxHeightRange,
PreferredExecutionNodeIDs: enNodeIDs.Strings(),
Log: suite.log,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(false),
TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly,
execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider(
suite.log,
suite.state,
receipts,
enNodeIDs,
nil,
)

bnd, err := backend.New(backend.Params{
State: suite.state,
CollectionRPC: suite.collClient,
Blocks: all.Blocks,
Headers: all.Headers,
Collections: collections,
Transactions: transactions,
ExecutionReceipts: receipts,
ExecutionResults: results,
ChainID: suite.chainID,
AccessMetrics: suite.metrics,
ConnFactory: connFactory,
MaxHeightRange: backend.DefaultMaxHeightRange,
Log: suite.log,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(false),
TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly,
ExecNodeIdentitiesProvider: execNodeIdentitiesProvider,
})
require.NoError(suite.T(), err)

Expand Down Expand Up @@ -826,23 +836,31 @@ func (suite *Suite) TestGetTransactionResult() {
blockTransactions, err := stdmap.NewIdentifierMap(100)
require.NoError(suite.T(), err)

execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider(
suite.log,
suite.state,
receipts,
enNodeIDs,
nil,
)

bnd, err := backend.New(backend.Params{State: suite.state,
CollectionRPC: suite.collClient,
Blocks: all.Blocks,
Headers: all.Headers,
Collections: collections,
Transactions: transactions,
ExecutionReceipts: receipts,
ExecutionResults: results,
ChainID: suite.chainID,
AccessMetrics: suite.metrics,
ConnFactory: connFactory,
MaxHeightRange: backend.DefaultMaxHeightRange,
PreferredExecutionNodeIDs: enNodeIDs.Strings(),
Log: suite.log,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(false),
TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly,
CollectionRPC: suite.collClient,
Blocks: all.Blocks,
Headers: all.Headers,
Collections: collections,
Transactions: transactions,
ExecutionReceipts: receipts,
ExecutionResults: results,
ChainID: suite.chainID,
AccessMetrics: suite.metrics,
ConnFactory: connFactory,
MaxHeightRange: backend.DefaultMaxHeightRange,
Log: suite.log,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(false),
TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly,
ExecNodeIdentitiesProvider: execNodeIdentitiesProvider,
})
require.NoError(suite.T(), err)

Expand Down Expand Up @@ -1056,26 +1074,34 @@ func (suite *Suite) TestExecuteScript() {
connFactory := connectionmock.NewConnectionFactory(suite.T())
connFactory.On("GetExecutionAPIClient", mock.Anything).Return(suite.execClient, &mockCloser{}, nil)

execNodeIdentitiesProvider := commonrpc.NewExecutionNodeIdentitiesProvider(
suite.log,
suite.state,
receipts,
nil,
identities.NodeIDs(),
)

var err error
suite.backend, err = backend.New(backend.Params{
State: suite.state,
CollectionRPC: suite.collClient,
Blocks: all.Blocks,
Headers: all.Headers,
Collections: collections,
Transactions: transactions,
ExecutionReceipts: receipts,
ExecutionResults: results,
ChainID: suite.chainID,
AccessMetrics: suite.metrics,
ConnFactory: connFactory,
MaxHeightRange: backend.DefaultMaxHeightRange,
FixedExecutionNodeIDs: (identities.NodeIDs()).Strings(),
Log: suite.log,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(false),
ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly,
TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly,
State: suite.state,
CollectionRPC: suite.collClient,
Blocks: all.Blocks,
Headers: all.Headers,
Collections: collections,
Transactions: transactions,
ExecutionReceipts: receipts,
ExecutionResults: results,
ChainID: suite.chainID,
AccessMetrics: suite.metrics,
ConnFactory: connFactory,
MaxHeightRange: backend.DefaultMaxHeightRange,
Log: suite.log,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(false),
ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly,
TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly,
ExecNodeIdentitiesProvider: execNodeIdentitiesProvider,
})
require.NoError(suite.T(), err)

Expand Down
Loading

0 comments on commit 0fba2ae

Please sign in to comment.