From fa80f2c4a6286adb75c8c1c5bd2ee7b2b2d76174 Mon Sep 17 00:00:00 2001 From: Anton Korpusenko Date: Mon, 22 Jul 2024 20:12:03 +0300 Subject: [PATCH 1/5] removed skip blinded + wrapped qbft timeout tests (#1494) --- protocol/v2/qbft/spectest/qbft_mapping_test.go | 5 ++++- protocol/v2/ssv/spectest/ssv_mapping_test.go | 5 ----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/protocol/v2/qbft/spectest/qbft_mapping_test.go b/protocol/v2/qbft/spectest/qbft_mapping_test.go index 486b3a208e..f59faa801c 100644 --- a/protocol/v2/qbft/spectest/qbft_mapping_test.go +++ b/protocol/v2/qbft/spectest/qbft_mapping_test.go @@ -108,8 +108,11 @@ func TestQBFTMapping(t *testing.T) { err = pre.Decode(preByts) require.NoError(t, err) typedTest.Pre = pre + t.Run(typedTest.Name, func(t *testing.T) { // using only spec struct so no need to run our version (TODO: check how we choose leader) + t.Parallel() + RunTimeout(t, typedTest) + }) - RunTimeout(t, typedTest) default: t.Fatalf("unsupported test type %s [%s]", testType, testName) } diff --git a/protocol/v2/ssv/spectest/ssv_mapping_test.go b/protocol/v2/ssv/spectest/ssv_mapping_test.go index a46d06e419..0dc530be25 100644 --- a/protocol/v2/ssv/spectest/ssv_mapping_test.go +++ b/protocol/v2/ssv/spectest/ssv_mapping_test.go @@ -78,11 +78,6 @@ func prepareTest(t *testing.T, logger *zap.Logger, name string, test interface{} switch testType { case reflect.TypeOf(&tests.MsgProcessingSpecTest{}).String(): - // TODO: fix blinded test - if strings.Contains(testName, "propose regular decide blinded") || strings.Contains(testName, "propose blinded decide regular") { - logger.Info("skipping blinded block test", zap.String("test", testName)) - return nil - } typedTest := msgProcessingSpecTestFromMap(t, test.(map[string]interface{})) return &runnable{ From 0060cf072f1acbeb7f42fbc95618798628034c00 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 22 Jul 2024 23:41:28 +0400 Subject: [PATCH 2/5] validatorStore: fix bug with indices on share update (#1495) validatorStore: fix bug with indices on share update --- registry/storage/validatorstore.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/registry/storage/validatorstore.go b/registry/storage/validatorstore.go index cf5cb65f00..e2fbae9a38 100644 --- a/registry/storage/validatorstore.go +++ b/registry/storage/validatorstore.go @@ -307,15 +307,12 @@ func (c *validatorStore) handleShareUpdated(share *types.SSVShare) { // Update byCommitteeID for _, committee := range c.byCommitteeID { + if committee.ID != share.CommitteeID() { + continue + } for i, validator := range committee.Validators { if validator.ValidatorPubKey == share.ValidatorPubKey { committee.Validators[i] = share - break - } - } - - for i, index := range committee.Indices { - if index == share.ValidatorIndex { committee.Indices[i] = share.ValidatorIndex break } From 302b9908c90cf0a6988f679dc6d50e7f449f59ba Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 23 Jul 2024 01:49:44 +0400 Subject: [PATCH 3/5] message validation: add comments when getting first validator index (#1496) * message validation: check indices length * add a comment * make comment clearer --- message/validation/common_checks.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/message/validation/common_checks.go b/message/validation/common_checks.go index 9c17c6d948..76e611d2a1 100644 --- a/message/validation/common_checks.go +++ b/message/validation/common_checks.go @@ -101,7 +101,9 @@ func (mv *messageValidator) validateBeaconDuty( // Rule: For a proposal duty message, we check if the validator is assigned to it if role == spectypes.RoleProposer { epoch := mv.netCfg.Beacon.EstimatedEpochAtSlot(slot) - if mv.dutyStore.Proposer.ValidatorDuty(epoch, slot, indices[0]) == nil { + // Non-committee roles always have one validator index. + validatorIndex := indices[0] + if mv.dutyStore.Proposer.ValidatorDuty(epoch, slot, validatorIndex) == nil { return ErrNoDuty } } @@ -109,7 +111,9 @@ func (mv *messageValidator) validateBeaconDuty( // Rule: For a sync committee aggregation duty message, we check if the validator is assigned to it if role == spectypes.RoleSyncCommitteeContribution { period := mv.netCfg.Beacon.EstimatedSyncCommitteePeriodAtEpoch(mv.netCfg.Beacon.EstimatedEpochAtSlot(slot)) - if mv.dutyStore.SyncCommittee.Duty(period, indices[0]) == nil { + // Non-committee roles always have one validator index. + validatorIndex := indices[0] + if mv.dutyStore.SyncCommittee.Duty(period, validatorIndex) == nil { return ErrNoDuty } } From 1221dd3b43815ecb4339c5949ced9e1a01052cf2 Mon Sep 17 00:00:00 2001 From: guym-blox <83158283+guym-blox@users.noreply.github.com> Date: Tue, 23 Jul 2024 13:37:36 +0300 Subject: [PATCH 4/5] Gm/no fork lint fix (#1447) * checkpoint * checkpoint * fix validation_test * checkpoint * change BN to be public and fix storage roles * fix lint issue using ssvspec role instead of convert role * fix lint issue using ssvspec role instead of convert role * fix lint left overs issues * fix lint left overs issues * fix lint issues on controller_test.go and msg_validator_test.go * fix lint issues on controller_test.go and msg_validator_test.go * fix lint issues on msg_processing_type.go and ssv_mapping_test.go * fix lint issues on p2p_test.go and p2p2_validation_test.go * fix lint issues on topics/controller.go and runner/committee/go * change go version in lint.yml * change lint.yml go version back to 1.20.x * update Makefile * update Makefile * update Makefile --------- Co-authored-by: guy muroch Co-authored-by: y0sher --- Makefile | 2 +- eth/eventhandler/handlers.go | 14 +- ibft/storage/stores_test.go | 3 +- integration/qbft/tests/scenario_test.go | 26 +-- .../qbft/tests/temp_testing_beacon_network.go | 42 ++--- message/validation/consensus_validation.go | 4 - network/discovery/dv5_service_test.go | 3 +- network/p2p/p2p_test.go | 176 +++++++++++++++++- network/p2p/p2p_validation_test.go | 121 +++++++++++- network/topics/controller.go | 2 +- network/topics/controller_test.go | 43 ++--- network/topics/msg_validator_test.go | 22 +-- network/topics/params/message_rate.go | 2 +- operator/validator/controller.go | 9 +- protocol/v2/qbft/instance/instance.go | 13 +- protocol/v2/qbft/instance/round_change.go | 4 +- protocol/v2/qbft/spectest/controller_type.go | 3 +- .../v2/qbft/spectest/msg_processing_type.go | 3 +- .../v2/qbft/spectest/qbft_mapping_test.go | 4 +- protocol/v2/qbft/testing/utils.go | 4 +- protocol/v2/ssv/runner/committee.go | 6 +- .../ssv/runner/sync_committee_aggregator.go | 3 + protocol/v2/ssv/spectest/debug_states.go | 2 +- .../v2/ssv/spectest/msg_processing_type.go | 36 ++-- protocol/v2/ssv/spectest/ssv_mapping_test.go | 5 +- protocol/v2/ssv/testing/runner.go | 21 ++- protocol/v2/ssv/validator/committee.go | 58 +++--- .../ssv/validator/non_committee_validator.go | 2 +- protocol/v2/testing/test_utils.go | 46 +++-- 29 files changed, 470 insertions(+), 209 deletions(-) diff --git a/Makefile b/Makefile index ead718961a..5f43206366 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ COV_CMD="-cover" ifeq ($(COVERAGE),true) COV_CMD=-coverpkg=./... -covermode="atomic" -coverprofile="coverage.out" endif -UNFORMATTED=$(shell gofmt -s -l .) +UNFORMATTED=$(shell gofmt -l .) #Lint .PHONY: lint-prepare diff --git a/eth/eventhandler/handlers.go b/eth/eventhandler/handlers.go index 2aa84c5d8b..d6ea863c8c 100644 --- a/eth/eventhandler/handlers.go +++ b/eth/eventhandler/handlers.go @@ -277,13 +277,11 @@ func (eh *EventHandler) validatorAddedEventToShare( selfOperatorID := eh.operatorDataStore.GetOperatorID() var shareSecret *bls.SecretKey - operators := make([]*spectypes.Operator, 0) - committee := make([]*spectypes.CommitteeMember, 0) shareMembers := make([]*spectypes.ShareMember, 0) for i := range event.OperatorIds { operatorID := event.OperatorIds[i] - od, found, err := eh.nodeStorage.GetOperatorData(txn, operatorID) + _, found, err := eh.nodeStorage.GetOperatorData(txn, operatorID) if err != nil { return nil, nil, fmt.Errorf("could not get operator data: %w", err) } @@ -293,11 +291,6 @@ func (eh *EventHandler) validatorAddedEventToShare( } } - committee = append(committee, &spectypes.CommitteeMember{ - OperatorID: operatorID, - SSVOperatorPubKey: od.PublicKey, - }) - shareMembers = append(shareMembers, &spectypes.ShareMember{ Signer: operatorID, SharePubKey: sharePublicKeys[i], @@ -327,11 +320,6 @@ func (eh *EventHandler) validatorAddedEventToShare( Err: errors.New("share private key does not match public key"), } } - - operators = append(operators, &spectypes.Operator{ - OperatorID: operatorID, - SSVOperatorPubKey: od.PublicKey, - }) } validatorShare.DomainType = eh.networkConfig.DomainType() diff --git a/ibft/storage/stores_test.go b/ibft/storage/stores_test.go index 46e93d1dfa..0fc9c640ee 100644 --- a/ibft/storage/stores_test.go +++ b/ibft/storage/stores_test.go @@ -36,9 +36,10 @@ func TestQBFTStores(t *testing.T) { id := []byte{1, 2, 3} - qbftMap.Each(func(role convert.RunnerRole, store qbftstorage.QBFTStore) error { + err = qbftMap.Each(func(role convert.RunnerRole, store qbftstorage.QBFTStore) error { return store.SaveInstance(&qbftstorage.StoredInstance{State: &specqbft.State{Height: 1, ID: id}}) }) + require.NoError(t, err) instance, err := qbftMap.Get(convert.RoleCommittee).GetInstance(id, 1) require.NoError(t, err) diff --git a/integration/qbft/tests/scenario_test.go b/integration/qbft/tests/scenario_test.go index f534bfeabb..e30f353338 100644 --- a/integration/qbft/tests/scenario_test.go +++ b/integration/qbft/tests/scenario_test.go @@ -2,6 +2,7 @@ package tests import ( "context" + "github.com/ssvlabs/ssv/exporter/convert" "testing" "time" @@ -81,12 +82,13 @@ func (s *Scenario) Run(t *testing.T, role spectypes.BeaconRole) { //validating state of validator after invoking duties for id, validationFunc := range s.ValidationFunctions { - identifier := spectypes.NewMsgID(networkconfig.TestNetwork.Domain, getKeySet(s.Committee).ValidatorPK.Serialize(), spectypes.MapDutyToRunnerRole(role)) + identifier := spectypes.NewMsgID(networkconfig.TestNetwork.DomainType(), getKeySet(s.Committee).ValidatorPK.Serialize(), spectypes.MapDutyToRunnerRole(role)) //getting stored state of validator var storedInstance *protocolstorage.StoredInstance for { + role := convert.MessageIDFromBytes(identifier[:]).GetRoleType() var err error - storedInstance, err = s.validators[id].Storage.Get(spectypes.MessageIDFromBytes(identifier[:]).GetRoleType()).GetHighestInstance(identifier[:]) + storedInstance, err = s.validators[id].Storage.Get(role).GetHighestInstance(identifier[:]) require.NoError(t, err) if storedInstance != nil { @@ -143,7 +145,6 @@ func testingShare(keySet *spectestingutils.TestKeySet, id spectypes.OperatorID) ValidatorPubKey: spectypes.ValidatorPK(keySet.ValidatorPK.Serialize()), SharePubKey: keySet.Shares[id].GetPublicKey().Serialize(), DomainType: testingutils.TestingSSVDomainType, - Quorum: keySet.Threshold, Committee: keySet.Committee(), } } @@ -160,17 +161,16 @@ func newStores(logger *zap.Logger) *qbftstorage.QBFTStores { storageMap := qbftstorage.NewStores() - roles := []spectypes.BeaconRole{ - spectypes.BNRoleAttester, - spectypes.BNRoleProposer, - spectypes.BNRoleAggregator, - spectypes.BNRoleSyncCommittee, - spectypes.BNRoleSyncCommitteeContribution, - spectypes.BNRoleValidatorRegistration, - spectypes.BNRoleVoluntaryExit, + roles := []convert.RunnerRole{ + convert.RoleCommittee, + convert.RoleProposer, + convert.RoleAggregator, + convert.RoleSyncCommitteeContribution, + convert.RoleValidatorRegistration, + convert.RoleVoluntaryExit, } for _, role := range roles { - storageMap.Add(spectypes.MapDutyToRunnerRole(role), qbftstorage.New(db, role.String())) + storageMap.Add(role, qbftstorage.New(db, role.String())) } return storageMap @@ -200,7 +200,7 @@ func createValidator(t *testing.T, pCtx context.Context, id spectypes.OperatorID Liquidated: false, }, }, - Beacon: spectestingutils.NewTestingBeaconNode(), + Beacon: NewTestingBeaconNodeWrapped(), Signer: km, } diff --git a/integration/qbft/tests/temp_testing_beacon_network.go b/integration/qbft/tests/temp_testing_beacon_network.go index c9a1c44c19..73160aaf46 100644 --- a/integration/qbft/tests/temp_testing_beacon_network.go +++ b/integration/qbft/tests/temp_testing_beacon_network.go @@ -14,73 +14,73 @@ import ( type TestingBeaconNodeWrapped struct { beacon.BeaconNode - bn *spectestingutils.TestingBeaconNode + Bn *spectestingutils.TestingBeaconNode } func (bn *TestingBeaconNodeWrapped) SetSyncCommitteeAggregatorRootHexes(roots map[string]bool) { - bn.bn.SetSyncCommitteeAggregatorRootHexes(roots) + bn.Bn.SetSyncCommitteeAggregatorRootHexes(roots) } func (bn *TestingBeaconNodeWrapped) GetBroadcastedRoots() []phase0.Root { - return bn.bn.BroadcastedRoots + return bn.Bn.BroadcastedRoots } func (bn *TestingBeaconNodeWrapped) GetBeaconNode() *spectestingutils.TestingBeaconNode { - return bn.bn + return bn.Bn } func (bn *TestingBeaconNodeWrapped) GetAttestationData(slot phase0.Slot, committeeIndex phase0.CommitteeIndex) (*phase0.AttestationData, spec.DataVersion, error) { - return bn.bn.GetAttestationData(slot, committeeIndex) + return bn.Bn.GetAttestationData(slot, committeeIndex) } func (bn *TestingBeaconNodeWrapped) DomainData(epoch phase0.Epoch, domain phase0.DomainType) (phase0.Domain, error) { - return bn.bn.DomainData(epoch, domain) + return bn.Bn.DomainData(epoch, domain) } func (bn *TestingBeaconNodeWrapped) SyncCommitteeSubnetID(index phase0.CommitteeIndex) (uint64, error) { - return bn.bn.SyncCommitteeSubnetID(index) + return bn.Bn.SyncCommitteeSubnetID(index) } func (bn *TestingBeaconNodeWrapped) IsSyncCommitteeAggregator(proof []byte) (bool, error) { - return bn.bn.IsSyncCommitteeAggregator(proof) + return bn.Bn.IsSyncCommitteeAggregator(proof) } func (bn *TestingBeaconNodeWrapped) GetSyncCommitteeContribution(slot phase0.Slot, selectionProofs []phase0.BLSSignature, subnetIDs []uint64) (ssz.Marshaler, spec.DataVersion, error) { - return bn.bn.GetSyncCommitteeContribution(slot, selectionProofs, subnetIDs) + return bn.Bn.GetSyncCommitteeContribution(slot, selectionProofs, subnetIDs) } func (bn *TestingBeaconNodeWrapped) SubmitAggregateSelectionProof(slot phase0.Slot, committeeIndex phase0.CommitteeIndex, committeeLength uint64, index phase0.ValidatorIndex, slotSig []byte) (ssz.Marshaler, spec.DataVersion, error) { - return bn.bn.SubmitAggregateSelectionProof(slot, committeeIndex, committeeLength, index, slotSig) + return bn.Bn.SubmitAggregateSelectionProof(slot, committeeIndex, committeeLength, index, slotSig) } func (bn *TestingBeaconNodeWrapped) GetBeaconNetwork() spectypes.BeaconNetwork { - return bn.bn.GetBeaconNetwork() + return bn.Bn.GetBeaconNetwork() } func (bn *TestingBeaconNodeWrapped) GetBeaconBlock(slot phase0.Slot, graffiti, randao []byte) (ssz.Marshaler, spec.DataVersion, error) { - return bn.bn.GetBeaconBlock(slot, graffiti, randao) + return bn.Bn.GetBeaconBlock(slot, graffiti, randao) } func (bn *TestingBeaconNodeWrapped) SubmitValidatorRegistration(pubkey []byte, feeRecipient bellatrix.ExecutionAddress, sig phase0.BLSSignature) error { - return bn.bn.SubmitValidatorRegistration(pubkey, feeRecipient, sig) + return bn.Bn.SubmitValidatorRegistration(pubkey, feeRecipient, sig) } func (bn *TestingBeaconNodeWrapped) SubmitVoluntaryExit(voluntaryExit *phase0.SignedVoluntaryExit) error { - return bn.bn.SubmitVoluntaryExit(voluntaryExit) + return bn.Bn.SubmitVoluntaryExit(voluntaryExit) } func (bn *TestingBeaconNodeWrapped) SubmitAttestations(attestations []*phase0.Attestation) error { - return bn.bn.SubmitAttestations(attestations) + return bn.Bn.SubmitAttestations(attestations) } func (bn *TestingBeaconNodeWrapped) SubmitSyncMessages(msgs []*altair.SyncCommitteeMessage) error { - return bn.bn.SubmitSyncMessages(msgs) + return bn.Bn.SubmitSyncMessages(msgs) } func (bn *TestingBeaconNodeWrapped) SubmitBlindedBeaconBlock(block *api.VersionedBlindedProposal, sig phase0.BLSSignature) error { - return bn.bn.SubmitBlindedBeaconBlock(block, sig) + return bn.Bn.SubmitBlindedBeaconBlock(block, sig) } func (bn *TestingBeaconNodeWrapped) SubmitSignedContributionAndProof(contribution *altair.SignedContributionAndProof) error { - return bn.bn.SubmitSignedContributionAndProof(contribution) + return bn.Bn.SubmitSignedContributionAndProof(contribution) } func (bn *TestingBeaconNodeWrapped) SubmitSignedAggregateSelectionProof(msg *phase0.SignedAggregateAndProof) error { - return bn.bn.SubmitSignedAggregateSelectionProof(msg) + return bn.Bn.SubmitSignedAggregateSelectionProof(msg) } func (bn *TestingBeaconNodeWrapped) SubmitBeaconBlock(block *api.VersionedProposal, sig phase0.BLSSignature) error { - return bn.bn.SubmitBeaconBlock(block, sig) + return bn.Bn.SubmitBeaconBlock(block, sig) } func NewTestingBeaconNodeWrapped() beacon.BeaconNode { bnw := &TestingBeaconNodeWrapped{} - bnw.bn = spectestingutils.NewTestingBeaconNode() + bnw.Bn = spectestingutils.NewTestingBeaconNode() return bnw } diff --git a/message/validation/consensus_validation.go b/message/validation/consensus_validation.go index fbfc09de7d..c9fe569e34 100644 --- a/message/validation/consensus_validation.go +++ b/message/validation/consensus_validation.go @@ -330,10 +330,6 @@ func (mv *messageValidator) processSignerState(signedSSVMessage *spectypes.Signe return nil } -func (mv *messageValidator) maxSlotsInState() phase0.Slot { - return phase0.Slot(mv.netCfg.SlotsPerEpoch()) + lateSlotAllowance -} - func (mv *messageValidator) validateJustifications(message *specqbft.Message) error { pj, err := message.GetPrepareJustifications() if err != nil { diff --git a/network/discovery/dv5_service_test.go b/network/discovery/dv5_service_test.go index ebe4b2dce3..f1129f0733 100644 --- a/network/discovery/dv5_service_test.go +++ b/network/discovery/dv5_service_test.go @@ -2,6 +2,7 @@ package discovery import ( "context" + "github.com/ssvlabs/ssv/networkconfig" "net" "os" "testing" @@ -116,7 +117,7 @@ func TestCheckPeer(t *testing.T) { ctx: ctx, conns: &mock.MockConnectionIndex{LimitValue: true}, subnetsIdx: subnetIndex, - domainType: &TestDomainType{myDomainType}, + domainType: networkconfig.TestNetwork, subnets: mySubnets, } diff --git a/network/p2p/p2p_test.go b/network/p2p/p2p_test.go index 3db2bcbc36..c5e788093f 100644 --- a/network/p2p/p2p_test.go +++ b/network/p2p/p2p_test.go @@ -3,6 +3,13 @@ package p2pv1 import ( "context" "encoding/hex" + "github.com/pkg/errors" + genesisspecqbft "github.com/ssvlabs/ssv-spec-pre-cc/qbft" + "github.com/ssvlabs/ssv/network" + "github.com/ssvlabs/ssv/network/commons" + "github.com/ssvlabs/ssv/protocol/v2/message" + p2pprotocol "github.com/ssvlabs/ssv/protocol/v2/p2p" + "github.com/ssvlabs/ssv/protocol/v2/ssv/queue" "sync" "sync/atomic" "testing" @@ -14,7 +21,6 @@ import ( specqbft "github.com/ssvlabs/ssv-spec/qbft" spectypes "github.com/ssvlabs/ssv-spec/types" - spectestingutils "github.com/ssvlabs/ssv-spec/types/testingutils" "github.com/ssvlabs/ssv/logging" "github.com/ssvlabs/ssv/networkconfig" ) @@ -33,13 +39,12 @@ func TestP2pNetwork_SubscribeBroadcast(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ks := spectestingutils.Testing4SharesSet() pks := []string{"8e80066551a81b318258709edaf7dd1f63cd686a0e4db8b29bbb7acfe65608677af5a527d9448ee47835485e02b50bc0"} - ln, routers, err := CreateNetworkAndSubscribeFromKeySet(t, ctx, LocalNetOptions{ + ln, routers, err := createNetworkAndSubscribe(t, ctx, LocalNetOptions{ Nodes: n, MinConnected: n/2 - 1, UseDiscv5: false, - }, ks, pks...) + }, pks...) require.NoError(t, err) require.NotNil(t, routers) require.NotNil(t, ln) @@ -111,12 +116,11 @@ func TestP2pNetwork_Stream(t *testing.T) { defer cancel() pkHex := "8e80066551a81b318258709edaf7dd1f63cd686a0e4db8b29bbb7acfe65608677af5a527d9448ee47835485e02b50bc0" - ks := spectestingutils.Testing4SharesSet() - ln, _, err := CreateNetworkAndSubscribeFromKeySet(t, ctx, LocalNetOptions{ + ln, _, err := createNetworkAndSubscribe(t, ctx, LocalNetOptions{ Nodes: n, MinConnected: n/2 - 1, UseDiscv5: false, - }, ks, pkHex) + }, pkHex) defer func() { for _, node := range ln.Nodes { @@ -216,3 +220,161 @@ func TestWaitSubsetOfPeers(t *testing.T) { }) } } + +func dummyMsgCommittee(t *testing.T, pkHex string, height int) (spectypes.MessageID, *spectypes.SignedSSVMessage) { + return dummyMsg(t, pkHex, height, spectypes.RoleCommittee) +} + +func dummyMsg(t *testing.T, pkHex string, height int, role spectypes.RunnerRole) (spectypes.MessageID, *spectypes.SignedSSVMessage) { + pk, err := hex.DecodeString(pkHex) + require.NoError(t, err) + id := spectypes.NewMsgID(networkconfig.TestNetwork.DomainType(), pk, role) + signedMsg := &genesisspecqbft.SignedMessage{ + Message: genesisspecqbft.Message{ + MsgType: genesisspecqbft.CommitMsgType, + Round: 2, + Identifier: id[:], + Height: genesisspecqbft.Height(height), + Root: [32]byte{0x1, 0x2, 0x3}, + }, + Signature: []byte("sVV0fsvqQlqliKv/ussGIatxpe8LDWhc9uoaM5WpjbiYvvxUr1eCpz0ja7UT1PGNDdmoGi6xbMC1g/ozhAt4uCdpy0Xdfqbv"), + Signers: []spectypes.OperatorID{1, 3, 4}, + } + data, err := signedMsg.Encode() + require.NoError(t, err) + ssvMsg := &spectypes.SSVMessage{ + MsgType: spectypes.SSVConsensusMsgType, + MsgID: id, + Data: data, + } + signedSSVMsg, err := spectypes.SSVMessageToSignedSSVMessage(ssvMsg, 1, dummySignSSVMessage) + require.NoError(t, err) + + return id, signedSSVMsg +} + +func dummySignSSVMessage(_ *spectypes.SSVMessage) ([]byte, error) { + return []byte{}, nil +} + +type dummyRouter struct { + count uint64 + i int +} + +func (r *dummyRouter) Route(_ context.Context, _ *queue.DecodedSSVMessage) { + atomic.AddUint64(&r.count, 1) +} + +func createNetworkAndSubscribe(t *testing.T, ctx context.Context, options LocalNetOptions, pks ...string) (*LocalNet, []*dummyRouter, error) { + logger, err := zap.NewDevelopment() + require.NoError(t, err) + ln, err := CreateAndStartLocalNet(ctx, logger.Named("createNetworkAndSubscribe"), options) + if err != nil { + return nil, nil, err + } + if len(ln.Nodes) != options.Nodes { + return nil, nil, errors.Errorf("only %d peers created, expected %d", len(ln.Nodes), options.Nodes) + } + + logger.Debug("created local network") + + routers := make([]*dummyRouter, options.Nodes) + for i, node := range ln.Nodes { + routers[i] = &dummyRouter{ + i: i, + } + node.UseMessageRouter(routers[i]) + } + + logger.Debug("subscribing to topics") + + var wg sync.WaitGroup + for _, pk := range pks { + vpk, err := hex.DecodeString(pk) + if err != nil { + return nil, nil, errors.Wrap(err, "could not decode validator public key") + } + for _, node := range ln.Nodes { + wg.Add(1) + go func(node network.P2PNetwork, vpk spectypes.ValidatorPK) { + defer wg.Done() + if err := node.Subscribe(vpk); err != nil { + logger.Warn("could not subscribe to topic", zap.Error(err)) + } + }(node, spectypes.ValidatorPK(vpk)) + } + } + wg.Wait() + // let the nodes subscribe + <-time.After(time.Second) + for _, pk := range pks { + vpk, err := hex.DecodeString(pk) + if err != nil { + return nil, nil, errors.Wrap(err, "could not decode validator public key") + } + for _, node := range ln.Nodes { + peers := make([]peer.ID, 0) + for len(peers) < 2 { + peers, err = node.Peers(spectypes.ValidatorPK(vpk)) + if err != nil { + return nil, nil, err + } + time.Sleep(time.Millisecond * 100) + } + } + } + + return ln, routers, nil +} + +func (n *p2pNetwork) LastDecided(logger *zap.Logger, mid spectypes.MessageID) ([]p2pprotocol.SyncResult, error) { + const ( + minPeers = 3 + waitTime = time.Second * 24 + ) + if !n.isReady() { + return nil, p2pprotocol.ErrNetworkIsNotReady + } + pid, maxPeers := commons.ProtocolID(p2pprotocol.LastDecidedProtocol) + peers, err := waitSubsetOfPeers(logger, n.getSubsetOfPeers, mid.GetDutyExecutorID(), minPeers, maxPeers, waitTime, allPeersFilter) + if err != nil { + return nil, errors.Wrap(err, "could not get subset of peers") + } + return n.makeSyncRequest(logger, peers, mid, pid, &message.SyncMessage{ + Params: &message.SyncParams{ + Identifier: mid, + }, + Protocol: message.LastDecidedType, + }) +} + +func registerHandler(logger *zap.Logger, node network.P2PNetwork, mid spectypes.MessageID, height specqbft.Height, round specqbft.Round, counter *int64, errors chan<- error) { + node.RegisterHandlers(logger, &p2pprotocol.SyncHandler{ + Protocol: p2pprotocol.LastDecidedProtocol, + Handler: func(message *spectypes.SSVMessage) (*spectypes.SSVMessage, error) { + atomic.AddInt64(counter, 1) + sm := genesisspecqbft.SignedMessage{ + Signature: make([]byte, 96), + Signers: []spectypes.OperatorID{1, 2, 3}, + Message: genesisspecqbft.Message{ + MsgType: genesisspecqbft.CommitMsgType, + Height: genesisspecqbft.Height(height), + Round: genesisspecqbft.Round(round), + Identifier: mid[:], + Root: [32]byte{1, 2, 3}, + }, + } + data, err := sm.Encode() + if err != nil { + errors <- err + return nil, err + } + return &spectypes.SSVMessage{ + MsgType: spectypes.SSVConsensusMsgType, + MsgID: mid, + Data: data, + }, nil + }, + }) +} diff --git a/network/p2p/p2p_validation_test.go b/network/p2p/p2p_validation_test.go index 7db166a5f4..a632261ced 100644 --- a/network/p2p/p2p_validation_test.go +++ b/network/p2p/p2p_validation_test.go @@ -5,6 +5,8 @@ import ( cryptorand "crypto/rand" "encoding/hex" "fmt" + "github.com/cornelk/hashmap" + "github.com/libp2p/go-libp2p/core/peer" "math/rand" "os" "sort" @@ -19,7 +21,6 @@ import ( "github.com/stretchr/testify/require" spectypes "github.com/ssvlabs/ssv-spec/types" - spectestingutils "github.com/ssvlabs/ssv-spec/types/testingutils" "github.com/ssvlabs/ssv/message/validation" ) @@ -54,12 +55,11 @@ func TestP2pNetwork_MessageValidation(t *testing.T) { ignoredRole = spectypes.RoleAggregator rejectedRole = spectypes.RoleSyncCommitteeContribution ) - messageValidators := CreateMsgValidators(&mtx, nodeCount, vNet) + messageValidators := make([]*MockMessageValidator, nodeCount) // Create a VirtualNet with 4 nodes. - ks := spectestingutils.Testing4SharesSet() vNet = CreateVirtualNet(t, ctx, 4, validators, func(nodeIndex int) validation.MessageValidator { return messageValidators[nodeIndex] - }, ks) + }) defer func() { require.NoError(t, vNet.Close()) }() @@ -204,3 +204,116 @@ func TestP2pNetwork_MessageValidation(t *testing.T) { } defer fmt.Println() } + +type MockMessageValidator struct { + Accepted []int + Ignored []int + Rejected []int + TotalAccepted int + TotalIgnored int + TotalRejected int + + ValidateFunc func(ctx context.Context, p peer.ID, pmsg *pubsub.Message) pubsub.ValidationResult +} + +func (v *MockMessageValidator) ValidatorForTopic(topic string) func(ctx context.Context, p peer.ID, pmsg *pubsub.Message) pubsub.ValidationResult { + return v.Validate +} + +func (v *MockMessageValidator) Validate(ctx context.Context, p peer.ID, pmsg *pubsub.Message) pubsub.ValidationResult { + return v.ValidateFunc(ctx, p, pmsg) +} + +type NodeIndex int + +type VirtualNode struct { + Index NodeIndex + Network *p2pNetwork + PeerScores *hashmap.Map[NodeIndex, *pubsub.PeerScoreSnapshot] +} + +func (n *VirtualNode) Broadcast(msgID spectypes.MessageID, msg *spectypes.SignedSSVMessage) error { + return n.Network.Broadcast(msgID, msg) +} + +// VirtualNet is a utility to create & interact with a virtual network of nodes. +type VirtualNet struct { + Nodes []*VirtualNode +} + +func CreateVirtualNet( + t *testing.T, + ctx context.Context, + nodes int, + validatorPubKeys []string, + messageValidatorProvider func(int) validation.MessageValidator, +) *VirtualNet { + var doneSetup atomic.Bool + vn := &VirtualNet{} + ln, routers, err := createNetworkAndSubscribe(t, ctx, LocalNetOptions{ + Nodes: nodes, + MinConnected: nodes - 1, + UseDiscv5: false, + TotalValidators: 1000, + ActiveValidators: 800, + MyValidators: 300, + MessageValidatorProvider: messageValidatorProvider, + PeerScoreInspector: func(selfPeer peer.ID, peerMap map[peer.ID]*pubsub.PeerScoreSnapshot) { + if !doneSetup.Load() { + return + } + node := vn.NodeByPeerID(selfPeer) + if node == nil { + t.Fatalf("self peer not found (%s)", selfPeer) + } + + node.PeerScores.Range(func(index NodeIndex, snapshot *pubsub.PeerScoreSnapshot) bool { + node.PeerScores.Del(index) + return true + }) + for peerID, peerScore := range peerMap { + peerNode := vn.NodeByPeerID(peerID) + if peerNode == nil { + t.Fatalf("peer not found (%s)", peerID) + } + node.PeerScores.Set(peerNode.Index, peerScore) + } + + }, + PeerScoreInspectorInterval: time.Millisecond * 5, + }, validatorPubKeys...) + + require.NoError(t, err) + require.NotNil(t, routers) + require.NotNil(t, ln) + + for i, node := range ln.Nodes { + vn.Nodes = append(vn.Nodes, &VirtualNode{ + Index: NodeIndex(i), + Network: node.(*p2pNetwork), + PeerScores: hashmap.New[NodeIndex, *pubsub.PeerScoreSnapshot](), //{}make(map[NodeIndex]*pubsub.PeerScoreSnapshot), + }) + } + doneSetup.Store(true) + + return vn +} + +func (vn *VirtualNet) NodeByPeerID(peerID peer.ID) *VirtualNode { + for _, node := range vn.Nodes { + if node.Network.Host().ID() == peerID { + return node + } + } + return nil +} + +func (vn *VirtualNet) Close() error { + for _, node := range vn.Nodes { + err := node.Network.Close() + if err != nil { + return err + } + } + return nil +} diff --git a/network/topics/controller.go b/network/topics/controller.go index 88c629ec19..0901aa90b0 100644 --- a/network/topics/controller.go +++ b/network/topics/controller.go @@ -125,7 +125,7 @@ func (ctrl *topicsCtrl) UpdateScoreParams(logger *zap.Logger) error { continue } if err := topic.SetScoreParams(p); err != nil { - errs = errs + fmt.Sprintf("could not set score params for topic %s: %w; ", topicName, err) + errs = errs + fmt.Sprintf("could not set score params for topic %s: %d; ", topicName, err) continue } } diff --git a/network/topics/controller_test.go b/network/topics/controller_test.go index 1873cc890a..8145703c2c 100644 --- a/network/topics/controller_test.go +++ b/network/topics/controller_test.go @@ -5,6 +5,10 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" + genesisspecqbft "github.com/ssvlabs/ssv-spec-pre-cc/qbft" + registrystorage "github.com/ssvlabs/ssv/registry/storage" + "github.com/ssvlabs/ssv/storage/basedb" + "github.com/ssvlabs/ssv/storage/kv" "math" "sync" "sync/atomic" @@ -16,10 +20,10 @@ import ( "github.com/libp2p/go-libp2p/core/host" libp2pnetwork "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + genesisvalidation "github.com/ssvlabs/ssv/message/validation/genesis" "github.com/stretchr/testify/require" "go.uber.org/zap" - specqbft "github.com/ssvlabs/ssv-spec/qbft" spectypes "github.com/ssvlabs/ssv-spec/types" "github.com/ssvlabs/ssv/logging" "github.com/ssvlabs/ssv/message/validation" @@ -27,7 +31,6 @@ import ( "github.com/ssvlabs/ssv/network/commons" "github.com/ssvlabs/ssv/network/discovery" "github.com/ssvlabs/ssv/networkconfig" - "github.com/ssvlabs/ssv/protocol/v2/ssv/queue" ) func TestTopicManager(t *testing.T) { @@ -68,7 +71,7 @@ func TestTopicManager(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - validator := validation.NewMessageValidator(networkconfig.TestNetwork) + validator := genesisvalidation.New(networkconfig.TestNetwork) scoreMap := map[peer.ID]*pubsub.PeerScoreSnapshot{} var scoreMapMu sync.Mutex @@ -372,8 +375,15 @@ func newPeer(ctx context.Context, logger *zap.Logger, t *testing.T, msgValidator ScoreInspectorInterval: 100 * time.Millisecond, // TODO: add mock for peers.ScoreIndex } + db, err := kv.NewInMemory(logger, basedb.Options{}) + require.NoError(t, err) - ps, tm, err := NewPubSub(ctx, logger, cfg, metricsreporter.NewNop()) + _, validatorStore, err := registrystorage.NewSharesStorage(logger, db, []byte("test")) + if err != nil { + t.Fatal(err) + } + + ps, tm, err := NewPubSub(ctx, logger, cfg, metricsreporter.NewNop(), validatorStore) require.NoError(t, err) p = &P{ @@ -417,18 +427,18 @@ func dummyMsg(pkHex string, height int, malformed bool) (*spectypes.SSVMessage, return nil, err } - id := spectypes.NewMsgID(networkconfig.TestNetwork.DomainType(), pk, spectypes.BNRoleAttester) + id := spectypes.NewMsgID(networkconfig.TestNetwork.DomainType(), pk, spectypes.RoleCommittee) signature, err := base64.StdEncoding.DecodeString("sVV0fsvqQlqliKv/ussGIatxpe8LDWhc9uoaM5WpjbiYvvxUr1eCpz0ja7UT1PGNDdmoGi6xbMC1g/ozhAt4uCdpy0Xdfqbv2hMf2iRL5ZPKOSmMifHbd8yg4PeeceyN") if err != nil { return nil, err } - signedMessage := specqbft.SignedMessage{ + signedMessage := genesisspecqbft.SignedMessage{ Signature: signature, Signers: []spectypes.OperatorID{1, 3, 4}, - Message: specqbft.Message{ - MsgType: specqbft.RoundChangeMsgType, - Height: specqbft.Height(height), + Message: genesisspecqbft.Message{ + MsgType: genesisspecqbft.RoundChangeMsgType, + Height: genesisspecqbft.Height(height), Round: 2, Identifier: id[:], Root: [32]byte{}, @@ -463,19 +473,6 @@ func (m *DummyMessageValidator) ValidatorForTopic(topic string) func(ctx context } } -func (m *DummyMessageValidator) ValidatePubsubMessage(ctx context.Context, p peer.ID, pmsg *pubsub.Message) pubsub.ValidationResult { +func (m *DummyMessageValidator) Validate(ctx context.Context, p peer.ID, pmsg *pubsub.Message) pubsub.ValidationResult { return pubsub.ValidationAccept } - -func (m *DummyMessageValidator) ValidateSSVMessage(msg *queue.DecodedSSVMessage) (*queue.DecodedSSVMessage, validation.Descriptor, error) { - var descriptor validation.Descriptor - - validatorPK := msg.SSVMessage.GetID().GetPubKey() - role := msg.SSVMessage.GetID().GetRoleType() - descriptor.Role = role - descriptor.ValidatorPK = validatorPK - - descriptor.SSVMessageType = msg.SSVMessage.GetType() - - return msg, descriptor, nil -} diff --git a/network/topics/msg_validator_test.go b/network/topics/msg_validator_test.go index b5e2c0679d..e80d4fa5ca 100644 --- a/network/topics/msg_validator_test.go +++ b/network/topics/msg_validator_test.go @@ -5,6 +5,8 @@ import ( "crypto" "crypto/rsa" "crypto/sha256" + genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types" + genesisvalidation "github.com/ssvlabs/ssv/message/validation/genesis" "testing" v1 "github.com/attestantio/go-eth2-client/api/v1" @@ -18,7 +20,6 @@ import ( "github.com/ssvlabs/ssv-spec/qbft" spectypes "github.com/ssvlabs/ssv-spec/types" spectestingutils "github.com/ssvlabs/ssv-spec/types/testingutils" - "github.com/ssvlabs/ssv/message/validation" "github.com/ssvlabs/ssv/network/commons" "github.com/ssvlabs/ssv/networkconfig" operatorstorage "github.com/ssvlabs/ssv/operator/storage" @@ -40,7 +41,7 @@ func TestMsgValidator(t *testing.T) { ks := spectestingutils.Testing4SharesSet() share := &ssvtypes.SSVShare{ - Share: *spectestingutils.TestingShare(ks), + Share: *spectestingutils.TestingShare(ks, 1), Metadata: ssvtypes.Metadata{ BeaconMetadata: &beaconprotocol.ValidatorMetadata{ Status: v1.ValidatorStateActiveOngoing, @@ -50,7 +51,7 @@ func TestMsgValidator(t *testing.T) { } require.NoError(t, ns.Shares().Save(nil, share)) - mv := validation.NewMessageValidator(networkconfig.TestNetwork, validation.WithNodeStorage(ns)) + mv := genesisvalidation.New(networkconfig.TestNetwork, genesisvalidation.WithNodeStorage(ns)) require.NotNil(t, mv) slot := networkconfig.TestNetwork.Beacon.GetBeaconNetwork().EstimatedCurrentSlot() @@ -86,11 +87,11 @@ func TestMsgValidator(t *testing.T) { signature, err := rsa.SignPKCS1v15(nil, operatorPrivateKey, crypto.SHA256, hash[:]) require.NoError(t, err) - sig := [256]byte{} + var sig []byte copy(sig[:], signature) - packedPubSubMsgPayload := spectypes.EncodeSignedSSVMessage(encodedMsg, operatorId, sig) - topicID := commons.ValidatorTopicID(ssvMsg.GetID().GetPubKey()) + packedPubSubMsgPayload := genesisspectypes.EncodeSignedSSVMessage(encodedMsg, operatorId, sig) + topicID := commons.ValidatorTopicID(ssvMsg.GetID().GetDutyExecutorID()) pmsg := &pubsub.Message{ Message: &pspb.Message{ @@ -98,8 +99,7 @@ func TestMsgValidator(t *testing.T) { Data: packedPubSubMsgPayload, }, } - - res := mv.ValidatePubsubMessage(context.Background(), "16Uiu2HAkyWQyCb6reWXGQeBUt9EXArk6h3aq3PsFMwLNq3pPGH1r", pmsg) + res := mv.Validate(context.Background(), "16Uiu2HAkyWQyCb6reWXGQeBUt9EXArk6h3aq3PsFMwLNq3pPGH1r", pmsg) require.Equal(t, pubsub.ValidationAccept, res) }) @@ -120,7 +120,7 @@ func TestMsgValidator(t *testing.T) { t.Run("empty message", func(t *testing.T) { pmsg := newPBMsg([]byte{}, "xxx", []byte{}) - res := mv.ValidatePubsubMessage(context.Background(), "xxxx", pmsg) + res := mv.Validate(context.Background(), "xxxx", pmsg) require.Equal(t, pubsub.ValidationReject, res) }) @@ -147,9 +147,9 @@ func newPBMsg(data []byte, topic string, from []byte) *pubsub.Message { } func dummySSVConsensusMsg(pk spectypes.ValidatorPK, height qbft.Height) (*spectypes.SSVMessage, error) { - id := spectypes.NewMsgID(networkconfig.TestNetwork.DomainType(), pk, spectypes.BNRoleAttester) + id := spectypes.NewMsgID(networkconfig.TestNetwork.DomainType(), pk[:], spectypes.RoleCommittee) ks := spectestingutils.Testing4SharesSet() - validSignedMessage := spectestingutils.TestingRoundChangeMessageWithHeightAndIdentifier(ks.Shares[1], 1, height, id[:]) + validSignedMessage := spectestingutils.TestingRoundChangeMessageWithHeightAndIdentifier(ks.OperatorKeys[1], 1, height, id[:]) encodedSignedMessage, err := validSignedMessage.Encode() if err != nil { diff --git a/network/topics/params/message_rate.go b/network/topics/params/message_rate.go index b5e01bd480..a64f4a07d8 100644 --- a/network/topics/params/message_rate.go +++ b/network/topics/params/message_rate.go @@ -110,7 +110,7 @@ func expectedSingleSCCommitteeDutiesPerEpochCached(numValidators int) float64 { // Calculates the message rate for a topic given its committees' configurations (number of operators and number of validators) func calculateMessageRateForTopic(committees []*storage.Committee) float64 { - if committees == nil || len(committees) == 0 { + if len(committees) == 0 { return 0 } diff --git a/operator/validator/controller.go b/operator/validator/controller.go index f295307df6..b273ee7fa2 100644 --- a/operator/validator/controller.go +++ b/operator/validator/controller.go @@ -188,7 +188,6 @@ type controller struct { committeesObserversMutex sync.Mutex recentlyStartedValidators uint64 - recentlyStartedCommittees uint64 metadataLastUpdated map[spectypes.ValidatorPK]time.Time indicesChange chan struct{} validatorExitCh chan duties.ExitDescriptor @@ -887,11 +886,6 @@ func (c *controller) onShareStop(pubKey spectypes.ValidatorPK) { } } -// todo wrapper to start both validator and committee -type starter interface { - Start() error -} - func (c *controller) onShareInit(share *ssvtypes.SSVShare) (*validator.Validator, *validator.Committee, error) { if !share.HasBeaconMetadata() { // fetching index and status in case not exist c.logger.Warn("skipping validator until it becomes active", fields.PubKey(share.ValidatorPubKey[:])) @@ -931,7 +925,8 @@ func (c *controller) onShareInit(share *ssvtypes.SSVShare) (*validator.Validator if !found { // Share context with both the validator and the runners, // so that when the validator is stopped, the runners are stopped as well. - ctx, _ := context.WithCancel(c.context) + ctx, cancel := context.WithCancel(c.context) + _ = cancel opts := c.validatorOptions opts.SSVShare = share diff --git a/protocol/v2/qbft/instance/instance.go b/protocol/v2/qbft/instance/instance.go index 268ece9730..0606cea113 100644 --- a/protocol/v2/qbft/instance/instance.go +++ b/protocol/v2/qbft/instance/instance.go @@ -4,7 +4,6 @@ import ( "encoding/base64" "encoding/json" "sync" - "time" "github.com/pkg/errors" spectypes "github.com/ssvlabs/ssv-spec/types" @@ -27,7 +26,6 @@ type Instance struct { forceStop bool StartValue []byte - started time.Time metrics *metrics } @@ -37,7 +35,7 @@ func NewInstance( identifier []byte, height specqbft.Height, ) *Instance { - var name = "" + var name string if len(identifier) == 56 { name = spectypes.MessageID(identifier).GetRoleType().String() } else { @@ -62,15 +60,6 @@ func NewInstance( } } -// TODO remove -func messageIDFromBytes(mid []byte) spectypes.MessageID { - if len(mid) < 56 { - return spectypes.MessageID{} - } - - return spectypes.MessageID(mid) -} - func (i *Instance) ForceStop() { i.forceStop = true } diff --git a/protocol/v2/qbft/instance/round_change.go b/protocol/v2/qbft/instance/round_change.go index c69677a957..1cd1d38073 100644 --- a/protocol/v2/qbft/instance/round_change.go +++ b/protocol/v2/qbft/instance/round_change.go @@ -118,7 +118,9 @@ func (i *Instance) uponChangeRoundPartialQuorum(logger *zap.Logger, newRound spe } root, err := specqbft.HashDataRoot(instanceStartValue) - + if err != nil { + return errors.Wrap(err, "failed to hash instance start value") + } logger.Debug("đŸ“ĸ got partial quorum, broadcasting round change message", fields.Round(i.State.Round), fields.Root(root), diff --git a/protocol/v2/qbft/spectest/controller_type.go b/protocol/v2/qbft/spectest/controller_type.go index 94a956146e..9d9bf733e3 100644 --- a/protocol/v2/qbft/spectest/controller_type.go +++ b/protocol/v2/qbft/spectest/controller_type.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "github.com/ssvlabs/ssv/exporter/convert" "os" "path/filepath" "reflect" @@ -58,7 +59,7 @@ func RunControllerSpecTest(t *testing.T, test *spectests.ControllerSpecTest) { func generateController(logger *zap.Logger) *controller.Controller { identifier := []byte{1, 2, 3, 4} - config := qbfttesting.TestingConfig(logger, spectestingutils.Testing4SharesSet(), spectypes.RoleCommittee) + config := qbfttesting.TestingConfig(logger, spectestingutils.Testing4SharesSet(), convert.RoleCommittee) return qbfttesting.NewTestingQBFTController( identifier[:], spectestingutils.TestingCommitteeMember(spectestingutils.Testing4SharesSet()), diff --git a/protocol/v2/qbft/spectest/msg_processing_type.go b/protocol/v2/qbft/spectest/msg_processing_type.go index eef4748d15..673212088e 100644 --- a/protocol/v2/qbft/spectest/msg_processing_type.go +++ b/protocol/v2/qbft/spectest/msg_processing_type.go @@ -3,6 +3,7 @@ package qbft import ( "encoding/hex" "fmt" + "github.com/ssvlabs/ssv/exporter/convert" "path/filepath" "reflect" "testing" @@ -31,7 +32,7 @@ func RunMsgProcessing(t *testing.T, test *spectests.MsgProcessingSpecTest) { msgId := specqbft.ControllerIdToMessageID(test.Pre.State.ID) logger := logging.TestLogger(t) pre := instance.NewInstance( - qbfttesting.TestingConfig(logger, spectestingutils.KeySetForCommitteeMember(test.Pre.State.CommitteeMember), msgId.GetRoleType()), + qbfttesting.TestingConfig(logger, spectestingutils.KeySetForCommitteeMember(test.Pre.State.CommitteeMember), convert.RunnerRole(msgId.GetRoleType())), test.Pre.State.CommitteeMember, test.Pre.State.ID, test.Pre.State.Height, diff --git a/protocol/v2/qbft/spectest/qbft_mapping_test.go b/protocol/v2/qbft/spectest/qbft_mapping_test.go index f59faa801c..bac7f00258 100644 --- a/protocol/v2/qbft/spectest/qbft_mapping_test.go +++ b/protocol/v2/qbft/spectest/qbft_mapping_test.go @@ -2,6 +2,7 @@ package qbft import ( "encoding/json" + "github.com/ssvlabs/ssv/exporter/convert" "os" "reflect" "strings" @@ -9,7 +10,6 @@ import ( spectests "github.com/ssvlabs/ssv-spec/qbft/spectest/tests" "github.com/ssvlabs/ssv-spec/qbft/spectest/tests/timeout" - spectypes "github.com/ssvlabs/ssv-spec/types" "github.com/ssvlabs/ssv-spec/types/testingutils" "github.com/stretchr/testify/require" @@ -100,7 +100,7 @@ func TestQBFTMapping(t *testing.T) { preByts, _ := typedTest.Pre.Encode() logger := logging.TestLogger(t) pre := instance.NewInstance( - testing2.TestingConfig(logger, testingutils.KeySetForCommitteeMember(typedTest.Pre.State.CommitteeMember), spectypes.RoleCommittee), + testing2.TestingConfig(logger, testingutils.KeySetForCommitteeMember(typedTest.Pre.State.CommitteeMember), convert.RoleCommittee), typedTest.Pre.State.CommitteeMember, typedTest.Pre.State.ID, typedTest.Pre.State.Height, diff --git a/protocol/v2/qbft/testing/utils.go b/protocol/v2/qbft/testing/utils.go index f9dcaa016e..de0adcd2ba 100644 --- a/protocol/v2/qbft/testing/utils.go +++ b/protocol/v2/qbft/testing/utils.go @@ -15,7 +15,7 @@ import ( "github.com/ssvlabs/ssv/protocol/v2/qbft/controller" ) -var TestingConfig = func(logger *zap.Logger, keySet *testingutils.TestKeySet, role types.RunnerRole) *qbft.Config { +var TestingConfig = func(logger *zap.Logger, keySet *testingutils.TestKeySet, role convert.RunnerRole) *qbft.Config { return &qbft.Config{ BeaconSigner: testingutils.NewTestingKeyManager(), OperatorSigner: testingutils.NewTestingOperatorSigner(keySet, 1), @@ -35,7 +35,7 @@ var TestingConfig = func(logger *zap.Logger, keySet *testingutils.TestKeySet, ro ProposerF: func(state *specqbft.State, round specqbft.Round) types.OperatorID { return 1 }, - Storage: TestingStores(logger).Get(convert.RunnerRole(role)), + Storage: TestingStores(logger).Get(role), Network: testingutils.NewTestingNetwork(1, keySet.OperatorKeys[1]), Timer: roundtimer.NewTestingTimer(), SignatureVerification: true, diff --git a/protocol/v2/ssv/runner/committee.go b/protocol/v2/ssv/runner/committee.go index 528d2bd5a6..435d692406 100644 --- a/protocol/v2/ssv/runner/committee.go +++ b/protocol/v2/ssv/runner/committee.go @@ -39,7 +39,6 @@ type CommitteeRunner struct { beacon beacon.BeaconNode signer types.BeaconSigner operatorSigner types.OperatorSigner - domain spectypes.DomainType valCheck specqbft.ProposedValueCheckF stoppedValidators map[spectypes.ValidatorPK]struct{} @@ -567,8 +566,11 @@ func (cr *CommitteeRunner) expectedPostConsensusRootsAndBeaconObjects() ( return nil, nil, nil, errors.Wrap(err, "could not decode beacon vote") } for _, beaconDuty := range duty.(*types.CommitteeDuty).BeaconDuties { + if beaconDuty == nil { + continue + } _, stopped := cr.stoppedValidators[spectypes.ValidatorPK(beaconDuty.PubKey)] - if beaconDuty == nil || stopped { + if stopped { continue } slot := beaconDuty.DutySlot() diff --git a/protocol/v2/ssv/runner/sync_committee_aggregator.go b/protocol/v2/ssv/runner/sync_committee_aggregator.go index 1a02e4f162..0d3774570c 100644 --- a/protocol/v2/ssv/runner/sync_committee_aggregator.go +++ b/protocol/v2/ssv/runner/sync_committee_aggregator.go @@ -228,6 +228,9 @@ func (r *SyncCommitteeAggregatorRunner) ProcessPostConsensus(logger *zap.Logger, r.metrics.EndPostConsensus() consensusData, err := spectypes.CreateConsensusData(r.GetState().DecidedValue) + if err != nil { + return errors.Wrap(err, "failed to decode decided value") + } // get contributions contributions, err := consensusData.GetSyncCommitteeContributions() diff --git a/protocol/v2/ssv/spectest/debug_states.go b/protocol/v2/ssv/spectest/debug_states.go index cc639b4d76..efd80cfa7b 100644 --- a/protocol/v2/ssv/spectest/debug_states.go +++ b/protocol/v2/ssv/spectest/debug_states.go @@ -49,6 +49,6 @@ func dumpState(t *testing.T, func logJSON(t *testing.T, name string, value interface{}) { bytes, err := json.Marshal(value) require.NoError(t, err) - err = os.WriteFile(fmt.Sprintf("%s/%s_test_serialized.json", dumpDir, name), bytes, 0644) + err = os.WriteFile(fmt.Sprintf("%s/%s_test_serialized.json", dumpDir, name), bytes, 0600) require.NoError(t, err) } diff --git a/protocol/v2/ssv/spectest/msg_processing_type.go b/protocol/v2/ssv/spectest/msg_processing_type.go index 288c52d4c3..0cd0c01292 100644 --- a/protocol/v2/ssv/spectest/msg_processing_type.go +++ b/protocol/v2/ssv/spectest/msg_processing_type.go @@ -134,7 +134,7 @@ func (test *MsgProcessingSpecTest) RunAsPartOfMultiTest(t *testing.T, logger *za } network := &spectestingutils.TestingNetwork{} - beaconNetwork := tests.NewTestingBeaconNodeWrapped() + var beaconNetwork *tests.TestingBeaconNodeWrapped var committee []*spectypes.Operator switch test.Runner.(type) { @@ -150,14 +150,14 @@ func (test *MsgProcessingSpecTest) RunAsPartOfMultiTest(t *testing.T, logger *za default: network = v.Network.(*spectestingutils.TestingNetwork) committee = v.Operator.Committee - beaconNetwork = test.Runner.GetBeaconNode() + beaconNetwork = test.Runner.GetBeaconNode().(*tests.TestingBeaconNodeWrapped) } // test output message spectestingutils.ComparePartialSignatureOutputMessages(t, test.OutputMessages, network.BroadcastedMsgs, committee) // test beacon broadcasted msgs - spectestingutils.CompareBroadcastedBeaconMsgs(t, test.BeaconBroadcastedRoots, beaconNetwork.(*tests.TestingBeaconNodeWrapped).GetBroadcastedRoots()) + spectestingutils.CompareBroadcastedBeaconMsgs(t, test.BeaconBroadcastedRoots, beaconNetwork.GetBroadcastedRoots()) // post root postRoot, err := test.Runner.GetRoot() @@ -169,20 +169,20 @@ func (test *MsgProcessingSpecTest) RunAsPartOfMultiTest(t *testing.T, logger *za } } -func (test *MsgProcessingSpecTest) compareBroadcastedBeaconMsgs(t *testing.T) { - broadcastedRoots := test.Runner.GetBeaconNode().(*tests.TestingBeaconNodeWrapped).GetBroadcastedRoots() - require.Len(t, broadcastedRoots, len(test.BeaconBroadcastedRoots)) - for _, r1 := range test.BeaconBroadcastedRoots { - found := false - for _, r2 := range broadcastedRoots { - if r1 == hex.EncodeToString(r2[:]) { - found = true - break - } - } - require.Truef(t, found, "broadcasted beacon root not found") - } -} +//func (test *MsgProcessingSpecTest) compareBroadcastedBeaconMsgs(t *testing.T) { +// broadcastedRoots := test.Runner.GetBeaconNode().(*tests.TestingBeaconNodeWrapped).GetBroadcastedRoots() +// require.Len(t, broadcastedRoots, len(test.BeaconBroadcastedRoots)) +// for _, r1 := range test.BeaconBroadcastedRoots { +// found := false +// for _, r2 := range broadcastedRoots { +// if r1 == hex.EncodeToString(r2[:]) { +// found = true +// break +// } +// } +// require.Truef(t, found, "broadcasted beacon root not found") +// } +//} func (test *MsgProcessingSpecTest) overrideStateComparison(t *testing.T) { testType := reflect.TypeOf(test).String() @@ -240,7 +240,7 @@ var baseCommitteeWithRunnerSample = func( shareMap[valIdx] = spectestingutils.TestingShare(ks, valIdx) } - createRunnerF := func(_ phase0.Slot, shareMap map[phase0.ValidatorIndex]*spectypes.Share, slashableValidators []spectypes.ShareValidatorPK) *runner.CommitteeRunner { + createRunnerF := func(_ phase0.Slot, shareMap map[phase0.ValidatorIndex]*spectypes.Share, _ []spectypes.ShareValidatorPK) *runner.CommitteeRunner { return runner.NewCommitteeRunner( networkconfig.TestNetwork, shareMap, diff --git a/protocol/v2/ssv/spectest/ssv_mapping_test.go b/protocol/v2/ssv/spectest/ssv_mapping_test.go index 0dc530be25..48b3fe115e 100644 --- a/protocol/v2/ssv/spectest/ssv_mapping_test.go +++ b/protocol/v2/ssv/spectest/ssv_mapping_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" + "github.com/ssvlabs/ssv/exporter/convert" tests2 "github.com/ssvlabs/ssv/integration/qbft/tests" "github.com/ssvlabs/ssv/logging" "github.com/ssvlabs/ssv/protocol/v2/qbft/controller" @@ -389,7 +390,7 @@ func fixRunnerForRun(t *testing.T, runnerMap map[string]interface{}, ks *spectes } func fixControllerForRun(t *testing.T, logger *zap.Logger, runner runner.Runner, contr *controller.Controller, ks *spectestingutils.TestKeySet) *controller.Controller { - config := qbfttesting.TestingConfig(logger, ks, spectypes.RoleCommittee) + config := qbfttesting.TestingConfig(logger, ks, convert.RoleCommittee) config.ValueCheckF = runner.GetValCheckF() newContr := controller.NewController( contr.Identifier, @@ -550,7 +551,7 @@ func fixCommitteeForRun(t *testing.T, ctx context.Context, logger *zap.Logger, c tests2.NewTestingBeaconNodeWrapped().GetBeaconNetwork(), &specCommittee.CommitteeMember, testingutils.NewTestingVerifier(), - func(slot phase0.Slot, shareMap map[phase0.ValidatorIndex]*spectypes.Share, slashableValidators []spectypes.ShareValidatorPK) *runner.CommitteeRunner { + func(slot phase0.Slot, shareMap map[phase0.ValidatorIndex]*spectypes.Share, _ []spectypes.ShareValidatorPK) *runner.CommitteeRunner { return ssvtesting.CommitteeRunnerWithShareMap(logger, shareMap).(*runner.CommitteeRunner) }, ) diff --git a/protocol/v2/ssv/testing/runner.go b/protocol/v2/ssv/testing/runner.go index 72a078f1d9..0742895bb7 100644 --- a/protocol/v2/ssv/testing/runner.go +++ b/protocol/v2/ssv/testing/runner.go @@ -2,6 +2,7 @@ package testing import ( "bytes" + "github.com/ssvlabs/ssv/exporter/convert" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/ssvlabs/ssv/integration/qbft/tests" @@ -77,13 +78,13 @@ var baseRunner = func( keySet *spectestingutils.TestKeySet, ) runner.Runner { share := spectestingutils.TestingShare(keySet, spectestingutils.TestingValidatorIndex) - identifier := spectypes.NewMsgID(TestingSSVDomainType, spectestingutils.TestingValidatorPubKey[:], spectypes.RunnerRole(role)) + identifier := spectypes.NewMsgID(TestingSSVDomainType, spectestingutils.TestingValidatorPubKey[:], role) net := spectestingutils.NewTestingNetwork(1, keySet.OperatorKeys[1]) km := spectestingutils.NewTestingKeyManager() operator := spectestingutils.TestingCommitteeMember(keySet) opSigner := spectestingutils.NewTestingOperatorSigner(keySet, 1) - config := testing.TestingConfig(logger, keySet, identifier.GetRoleType()) + config := testing.TestingConfig(logger, keySet, convert.RunnerRole(identifier.GetRoleType())) config.ValueCheckF = valCheck config.ProposerF = func(state *specqbft.State, round specqbft.Round) spectypes.OperatorID { return 1 @@ -114,7 +115,7 @@ var baseRunner = func( km, opSigner, valCheck, - ).(runner.Runner) + ) case spectypes.RoleAggregator: return runner.NewAggregatorRunner( networkconfig.TestNetwork, @@ -127,7 +128,7 @@ var baseRunner = func( opSigner, valCheck, TestingHighestDecidedSlot, - ).(runner.Runner) + ) case spectypes.RoleProposer: return runner.NewProposerRunner( networkconfig.TestNetwork, @@ -140,7 +141,7 @@ var baseRunner = func( opSigner, valCheck, TestingHighestDecidedSlot, - ).(runner.Runner) + ) case spectypes.RoleSyncCommitteeContribution: return runner.NewSyncCommitteeAggregatorRunner( networkconfig.TestNetwork, @@ -153,7 +154,7 @@ var baseRunner = func( opSigner, valCheck, TestingHighestDecidedSlot, - ).(runner.Runner) + ) case spectypes.RoleValidatorRegistration: return runner.NewValidatorRegistrationRunner( networkconfig.TestNetwork, @@ -164,7 +165,7 @@ var baseRunner = func( net, km, opSigner, - ).(runner.Runner) + ) case spectypes.RoleVoluntaryExit: return runner.NewVoluntaryExitRunner( networkconfig.TestNetwork, @@ -174,7 +175,7 @@ var baseRunner = func( net, km, opSigner, - ).(runner.Runner) + ) case spectestingutils.UnknownDutyType: ret := runner.NewCommitteeRunner( networkconfig.TestNetwork, @@ -187,7 +188,7 @@ var baseRunner = func( valCheck, ) ret.(*runner.CommitteeRunner).BaseRunner.RunnerRoleType = spectestingutils.UnknownDutyType - return ret.(runner.Runner) + return ret default: panic("unknown role type") } @@ -292,7 +293,7 @@ var baseRunnerWithShareMap = func( committeeMember := spectestingutils.TestingCommitteeMember(keySetInstance) opSigner := spectestingutils.NewTestingOperatorSigner(keySetInstance, committeeMember.OperatorID) - config := testing.TestingConfig(logger, keySetInstance, identifier.GetRoleType()) + config := testing.TestingConfig(logger, keySetInstance, convert.RunnerRole(identifier.GetRoleType())) config.ValueCheckF = valCheck config.ProposerF = func(state *specqbft.State, round specqbft.Round) spectypes.OperatorID { return 1 diff --git a/protocol/v2/ssv/validator/committee.go b/protocol/v2/ssv/validator/committee.go index a0675d482a..f3b8e3cd9b 100644 --- a/protocol/v2/ssv/validator/committee.go +++ b/protocol/v2/ssv/validator/committee.go @@ -101,21 +101,20 @@ func (c *Committee) StartDuty(logger *zap.Logger, duty *spectypes.CommitteeDuty) } slashableValidators := make([]spectypes.ShareValidatorPK, 0, len(duty.BeaconDuties)) - - validatorShares := make(map[phase0.ValidatorIndex]*spectypes.Share, len(duty.BeaconDuties)) - toRemove := make([]int, 0) + //validatorShares := make(map[phase0.ValidatorIndex]*spectypes.Share, len(duty.BeaconDuties)) + //toRemove := make([]int, 0) // Remove beacon duties that don't have a share - for i, bd := range duty.BeaconDuties { - share, ok := c.Shares[bd.ValidatorIndex] - if !ok { - toRemove = append(toRemove, i) - continue - } - if bd.Type == spectypes.BNRoleAttester { - slashableValidators = append(slashableValidators, share.SharePubKey) - } - validatorShares[bd.ValidatorIndex] = share - } + //for i, bd := range duty.BeaconDuties { + // share, ok := c.Shares[bd.ValidatorIndex] + // if !ok { + // toRemove = append(toRemove, i) + // continue + // } + // if bd.Type == spectypes.BNRoleAttester { + // slashableValidators = append(slashableValidators, share.SharePubKey) + // } + // validatorShares[bd.ValidatorIndex] = share + //} // TODO bring this back when https://github.com/ssvlabs/ssv-spec/pull/467 is merged and spec is aligned //// Remove beacon duties that don't have a share @@ -135,6 +134,15 @@ func (c *Committee) StartDuty(logger *zap.Logger, duty *spectypes.CommitteeDuty) // TODO REMOVE this after https://github.com/ssvlabs/ssv-spec/pull/467 is merged and we are aligned to the spec // and pas validatorShares instead of sharesCopy the runner // --> + for _, bd := range duty.BeaconDuties { + share, ok := c.Shares[bd.ValidatorIndex] + if !ok { + continue + } + if bd.Type == spectypes.BNRoleAttester { + slashableValidators = append(slashableValidators, share.SharePubKey) + } + } var sharesCopy = make(map[phase0.ValidatorIndex]*spectypes.Share, len(c.Shares)) for k, v := range c.Shares { sharesCopy[k] = v @@ -159,7 +167,12 @@ func (c *Committee) StartDuty(logger *zap.Logger, duty *spectypes.CommitteeDuty) logger = c.logger.With(fields.DutyID(fields.FormatCommitteeDutyID(c.Operator.Committee, c.BeaconNetwork.EstimatedEpochAtSlot(duty.Slot), duty.Slot)), fields.Slot(duty.Slot)) // TODO alan: stop queue - go c.ConsumeQueue(logger, duty.Slot, c.ProcessMessage, c.Runners[duty.Slot]) + go func() { + err := c.ConsumeQueue(logger, duty.Slot, c.ProcessMessage, c.Runners[duty.Slot]) + if err != nil { + logger.Warn("handles error message", zap.Error(err)) + } + }() logger.Info("ℹī¸ starting duty processing") return c.Runners[duty.Slot].StartNewDuty(logger, duty, c.Operator.GetQuorum()) @@ -333,21 +346,6 @@ func (c *Committee) UnmarshalJSON(data []byte) error { return nil } -// updateAttestingSlotMap updates the highest attesting slot map from beacon duties -func (c *Committee) updateAttestingSlotMap(duty *spectypes.CommitteeDuty) { - for _, beaconDuty := range duty.BeaconDuties { - if beaconDuty.Type == spectypes.BNRoleAttester { - validatorPK := spectypes.ValidatorPK(beaconDuty.PubKey) - if _, ok := c.HighestAttestingSlotMap[validatorPK]; !ok { - c.HighestAttestingSlotMap[validatorPK] = beaconDuty.Slot - } - if c.HighestAttestingSlotMap[validatorPK] < beaconDuty.Slot { - c.HighestAttestingSlotMap[validatorPK] = beaconDuty.Slot - } - } - } -} - func (c *Committee) validateMessage(msg *spectypes.SSVMessage) error { if !(c.Operator.CommitteeID.MessageIDBelongs(msg.GetID())) { return errors.New("msg ID doesn't match committee ID") diff --git a/protocol/v2/ssv/validator/non_committee_validator.go b/protocol/v2/ssv/validator/non_committee_validator.go index b7a9f5b7d0..502041d5ca 100644 --- a/protocol/v2/ssv/validator/non_committee_validator.go +++ b/protocol/v2/ssv/validator/non_committee_validator.go @@ -83,7 +83,7 @@ func (ncv *CommitteeObserver) ProcessMessage(msg *queue.DecodedSSVMessage) error return fmt.Errorf("failed to get partial signature message from network message %w", err) } if partialSigMessages.Type != spectypes.PostConsensusPartialSig { - return fmt.Errorf("not processing message type %s", partialSigMessages.Type) + return fmt.Errorf("not processing message type %d", partialSigMessages.Type) } slot := partialSigMessages.Slot diff --git a/protocol/v2/testing/test_utils.go b/protocol/v2/testing/test_utils.go index 0df2e3aa0c..a1cdf42c28 100644 --- a/protocol/v2/testing/test_utils.go +++ b/protocol/v2/testing/test_utils.go @@ -114,17 +114,27 @@ func GetSSVMappingSpecTestJSON(path string, module string) ([]byte, error) { gzPath := filepath.Join(p, "spectest", "generate", "tests.json.gz") untypedTests := map[string]interface{}{} - file, err := os.Open(gzPath) + file, err := os.Open(gzPath) // #nosec G304 if err != nil { return nil, errors.Wrap(err, "failed to open gzip file") } - defer file.Close() + defer func() { + if err := file.Close(); err != nil { + // Handle the error, log it, or handle it as appropriate + log.Printf("Failed to close file: %v", err) + } + }() gzipReader, err := gzip.NewReader(file) if err != nil { return nil, errors.Wrap(err, "failed to create gzip reader") } - defer gzipReader.Close() + defer func() { + if err := gzipReader.Close(); err != nil { + // Handle the error, log it, or handle it as appropriate + log.Printf("Failed to close reader: %v", err) + } + }() decompressedData, err := io.ReadAll(gzipReader) if err != nil { @@ -253,7 +263,7 @@ func ExtractTarGz(gzipStream io.Reader) { tarReader := tar.NewReader(uncompressedStream) - for true { + for { header, err := tarReader.Next() if err == io.EOF { @@ -266,7 +276,7 @@ func ExtractTarGz(gzipStream io.Reader) { switch header.Typeflag { case tar.TypeDir: - if err := os.Mkdir(header.Name, 0755); err != nil { + if err := os.Mkdir(header.Name, 0750); err != nil { log.Fatalf("ExtractTarGz: Mkdir() failed: %s", err.Error()) } case tar.TypeReg: @@ -274,27 +284,27 @@ func ExtractTarGz(gzipStream io.Reader) { if err != nil { log.Fatalf("ExtractTarGz: Create() failed: %s", err.Error()) } - if _, err := io.Copy(outFile, tarReader); err != nil { + // Set a maximum size limit for the decompressed data + maxSize := int64(50 * 1024 * 1024) // 50 MB, adjust as needed + + // Wrap the tarReader with a LimitedReader + limitedReader := &io.LimitedReader{R: tarReader, N: maxSize} + + // Perform the copy operation with the limited reader + if _, err := io.Copy(outFile, limitedReader); err != nil { log.Fatalf("ExtractTarGz: Copy() failed: %s", err.Error()) } - outFile.Close() + err = outFile.Close() + if err != nil { + log.Fatalf("faild to close file: %s", err.Error()) + } default: log.Fatalf( - "ExtractTarGz: uknown type: %s in %s", + "ExtractTarGz: uknown type: %d in %s", header.Typeflag, header.Name) } } } - -func unpackTestsJson(path string) error { - r, err := os.Open(fmt.Sprintf("%s.gz", path)) - if err != nil { - errors.Wrap(err, "could not open file") - } - ExtractTarGz(r) - - return nil -} From ec363f8dc6f11a6ae4a0c1b0fcc70de2a0d470cc Mon Sep 17 00:00:00 2001 From: rehs0y Date: Tue, 23 Jul 2024 18:27:57 +0000 Subject: [PATCH 5/5] fix: (alan) don't submit empty attestation or sc messages (#1501) * don't send empty att/sc * assign new var inside ifs instead setting existing --- protocol/v2/ssv/runner/committee.go | 58 ++++++++++++++++------------- 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/protocol/v2/ssv/runner/committee.go b/protocol/v2/ssv/runner/committee.go index 435d692406..7b981d4b45 100644 --- a/protocol/v2/ssv/runner/committee.go +++ b/protocol/v2/ssv/runner/committee.go @@ -418,44 +418,50 @@ func (cr *CommitteeRunner) ProcessPostConsensus(logger *zap.Logger, signedMsg *t } } } + logger = logger.With(durationFields...) // Submit multiple attestations - attestations := make([]*phase0.Attestation, 0) + attestations := make([]*phase0.Attestation, 0, len(attestationsToSubmit)) for _, att := range attestationsToSubmit { attestations = append(attestations, att) } - submmitionStart := time.Now() - if err := cr.beacon.SubmitAttestations(attestations); err != nil { - logger.Error("❌ failed to submit attestation", zap.Error(err)) - return errors.Wrap(err, "could not submit to Beacon chain reconstructed attestation") - } - logger.Info("✅ successfully submitted attestations", - fields.SubmissionTime(time.Since(submmitionStart)), - fields.Height(cr.BaseRunner.QBFTController.Height), - fields.Round(cr.BaseRunner.State.RunningInstance.State.Round)) - // Record successful submissions - for validator := range attestationsToSubmit { - cr.RecordSubmission(types.BNRoleAttester, validator) + if len(attestations) > 0 { + submissionStart := time.Now() + if err := cr.beacon.SubmitAttestations(attestations); err != nil { + logger.Error("❌ failed to submit attestation", zap.Error(err)) + return errors.Wrap(err, "could not submit to Beacon chain reconstructed attestation") + } + + logger.Info("✅ successfully submitted attestations", + fields.SubmissionTime(time.Since(submissionStart)), + fields.Height(cr.BaseRunner.QBFTController.Height), + fields.Round(cr.BaseRunner.State.RunningInstance.State.Round)) + // Record successful submissions + for validator := range attestationsToSubmit { + cr.RecordSubmission(types.BNRoleAttester, validator) + } } // Submit multiple sync committee - syncCommitteeMessages := make([]*altair.SyncCommitteeMessage, 0) + syncCommitteeMessages := make([]*altair.SyncCommitteeMessage, 0, len(syncCommitteeMessagesToSubmit)) for _, syncMsg := range syncCommitteeMessagesToSubmit { syncCommitteeMessages = append(syncCommitteeMessages, syncMsg) } - submmitionStart = time.Now() - if err := cr.beacon.SubmitSyncMessages(syncCommitteeMessages); err != nil { - logger.Error("❌ failed to submit sync committee", zap.Error(err)) - return errors.Wrap(err, "could not submit to Beacon chain reconstructed signed sync committee") - } - logger.Info("✅ successfully submitted sync committee", - fields.SubmissionTime(time.Since(submmitionStart)), - fields.Height(cr.BaseRunner.QBFTController.Height), - fields.Round(cr.BaseRunner.State.RunningInstance.State.Round)) - // Record successful submissions - for validator := range syncCommitteeMessagesToSubmit { - cr.RecordSubmission(types.BNRoleSyncCommittee, validator) + if len(syncCommitteeMessages) > 0 { + submissionStart := time.Now() + if err := cr.beacon.SubmitSyncMessages(syncCommitteeMessages); err != nil { + logger.Error("❌ failed to submit sync committee", zap.Error(err)) + return errors.Wrap(err, "could not submit to Beacon chain reconstructed signed sync committee") + } + logger.Info("✅ successfully submitted sync committee", + fields.SubmissionTime(time.Since(submissionStart)), + fields.Height(cr.BaseRunner.QBFTController.Height), + fields.Round(cr.BaseRunner.State.RunningInstance.State.Round)) + // Record successful submissions + for validator := range syncCommitteeMessagesToSubmit { + cr.RecordSubmission(types.BNRoleSyncCommittee, validator) + } } if anyErr != nil {