diff --git a/Makefile b/Makefile index ead718961a..5f43206366 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ COV_CMD="-cover" ifeq ($(COVERAGE),true) COV_CMD=-coverpkg=./... -covermode="atomic" -coverprofile="coverage.out" endif -UNFORMATTED=$(shell gofmt -s -l .) +UNFORMATTED=$(shell gofmt -l .) #Lint .PHONY: lint-prepare diff --git a/cli/bootnode/boot_node.go b/cli/bootnode/boot_node.go index 3e93e0a2d4..2ba2644833 100644 --- a/cli/bootnode/boot_node.go +++ b/cli/bootnode/boot_node.go @@ -2,9 +2,10 @@ package bootnode import ( "fmt" - "github.com/ssvlabs/ssv/utils/commons" "log" + "github.com/ssvlabs/ssv/utils/commons" + "github.com/ssvlabs/ssv/logging" "github.com/ilyakaznacheev/cleanenv" diff --git a/eth/eventhandler/handlers.go b/eth/eventhandler/handlers.go index 2aa84c5d8b..d6ea863c8c 100644 --- a/eth/eventhandler/handlers.go +++ b/eth/eventhandler/handlers.go @@ -277,13 +277,11 @@ func (eh *EventHandler) validatorAddedEventToShare( selfOperatorID := eh.operatorDataStore.GetOperatorID() var shareSecret *bls.SecretKey - operators := make([]*spectypes.Operator, 0) - committee := make([]*spectypes.CommitteeMember, 0) shareMembers := make([]*spectypes.ShareMember, 0) for i := range event.OperatorIds { operatorID := event.OperatorIds[i] - od, found, err := eh.nodeStorage.GetOperatorData(txn, operatorID) + _, found, err := eh.nodeStorage.GetOperatorData(txn, operatorID) if err != nil { return nil, nil, fmt.Errorf("could not get operator data: %w", err) } @@ -293,11 +291,6 @@ func (eh *EventHandler) validatorAddedEventToShare( } } - committee = append(committee, &spectypes.CommitteeMember{ - OperatorID: operatorID, - SSVOperatorPubKey: od.PublicKey, - }) - shareMembers = append(shareMembers, &spectypes.ShareMember{ Signer: operatorID, SharePubKey: sharePublicKeys[i], @@ -327,11 +320,6 @@ func (eh *EventHandler) validatorAddedEventToShare( Err: errors.New("share private key does not match public key"), } } - - operators = append(operators, &spectypes.Operator{ - OperatorID: operatorID, - SSVOperatorPubKey: od.PublicKey, - }) } validatorShare.DomainType = eh.networkConfig.DomainType() diff --git a/ibft/storage/stores_test.go b/ibft/storage/stores_test.go index 46e93d1dfa..0fc9c640ee 100644 --- a/ibft/storage/stores_test.go +++ b/ibft/storage/stores_test.go @@ -36,9 +36,10 @@ func TestQBFTStores(t *testing.T) { id := []byte{1, 2, 3} - qbftMap.Each(func(role convert.RunnerRole, store qbftstorage.QBFTStore) error { + err = qbftMap.Each(func(role convert.RunnerRole, store qbftstorage.QBFTStore) error { return store.SaveInstance(&qbftstorage.StoredInstance{State: &specqbft.State{Height: 1, ID: id}}) }) + require.NoError(t, err) instance, err := qbftMap.Get(convert.RoleCommittee).GetInstance(id, 1) require.NoError(t, err) diff --git a/integration/qbft/tests/scenario_test.go b/integration/qbft/tests/scenario_test.go index f534bfeabb..e30f353338 100644 --- a/integration/qbft/tests/scenario_test.go +++ b/integration/qbft/tests/scenario_test.go @@ -2,6 +2,7 @@ package tests import ( "context" + "github.com/ssvlabs/ssv/exporter/convert" "testing" "time" @@ -81,12 +82,13 @@ func (s *Scenario) Run(t *testing.T, role spectypes.BeaconRole) { //validating state of validator after invoking duties for id, validationFunc := range s.ValidationFunctions { - identifier := spectypes.NewMsgID(networkconfig.TestNetwork.Domain, getKeySet(s.Committee).ValidatorPK.Serialize(), spectypes.MapDutyToRunnerRole(role)) + identifier := spectypes.NewMsgID(networkconfig.TestNetwork.DomainType(), getKeySet(s.Committee).ValidatorPK.Serialize(), spectypes.MapDutyToRunnerRole(role)) //getting stored state of validator var storedInstance *protocolstorage.StoredInstance for { + role := convert.MessageIDFromBytes(identifier[:]).GetRoleType() var err error - storedInstance, err = s.validators[id].Storage.Get(spectypes.MessageIDFromBytes(identifier[:]).GetRoleType()).GetHighestInstance(identifier[:]) + storedInstance, err = s.validators[id].Storage.Get(role).GetHighestInstance(identifier[:]) require.NoError(t, err) if storedInstance != nil { @@ -143,7 +145,6 @@ func testingShare(keySet *spectestingutils.TestKeySet, id spectypes.OperatorID) ValidatorPubKey: spectypes.ValidatorPK(keySet.ValidatorPK.Serialize()), SharePubKey: keySet.Shares[id].GetPublicKey().Serialize(), DomainType: testingutils.TestingSSVDomainType, - Quorum: keySet.Threshold, Committee: keySet.Committee(), } } @@ -160,17 +161,16 @@ func newStores(logger *zap.Logger) *qbftstorage.QBFTStores { storageMap := qbftstorage.NewStores() - roles := []spectypes.BeaconRole{ - spectypes.BNRoleAttester, - spectypes.BNRoleProposer, - spectypes.BNRoleAggregator, - spectypes.BNRoleSyncCommittee, - spectypes.BNRoleSyncCommitteeContribution, - spectypes.BNRoleValidatorRegistration, - spectypes.BNRoleVoluntaryExit, + roles := []convert.RunnerRole{ + convert.RoleCommittee, + convert.RoleProposer, + convert.RoleAggregator, + convert.RoleSyncCommitteeContribution, + convert.RoleValidatorRegistration, + convert.RoleVoluntaryExit, } for _, role := range roles { - storageMap.Add(spectypes.MapDutyToRunnerRole(role), qbftstorage.New(db, role.String())) + storageMap.Add(role, qbftstorage.New(db, role.String())) } return storageMap @@ -200,7 +200,7 @@ func createValidator(t *testing.T, pCtx context.Context, id spectypes.OperatorID Liquidated: false, }, }, - Beacon: spectestingutils.NewTestingBeaconNode(), + Beacon: NewTestingBeaconNodeWrapped(), Signer: km, } diff --git a/integration/qbft/tests/temp_testing_beacon_network.go b/integration/qbft/tests/temp_testing_beacon_network.go index c9a1c44c19..73160aaf46 100644 --- a/integration/qbft/tests/temp_testing_beacon_network.go +++ b/integration/qbft/tests/temp_testing_beacon_network.go @@ -14,73 +14,73 @@ import ( type TestingBeaconNodeWrapped struct { beacon.BeaconNode - bn *spectestingutils.TestingBeaconNode + Bn *spectestingutils.TestingBeaconNode } func (bn *TestingBeaconNodeWrapped) SetSyncCommitteeAggregatorRootHexes(roots map[string]bool) { - bn.bn.SetSyncCommitteeAggregatorRootHexes(roots) + bn.Bn.SetSyncCommitteeAggregatorRootHexes(roots) } func (bn *TestingBeaconNodeWrapped) GetBroadcastedRoots() []phase0.Root { - return bn.bn.BroadcastedRoots + return bn.Bn.BroadcastedRoots } func (bn *TestingBeaconNodeWrapped) GetBeaconNode() *spectestingutils.TestingBeaconNode { - return bn.bn + return bn.Bn } func (bn *TestingBeaconNodeWrapped) GetAttestationData(slot phase0.Slot, committeeIndex phase0.CommitteeIndex) (*phase0.AttestationData, spec.DataVersion, error) { - return bn.bn.GetAttestationData(slot, committeeIndex) + return bn.Bn.GetAttestationData(slot, committeeIndex) } func (bn *TestingBeaconNodeWrapped) DomainData(epoch phase0.Epoch, domain phase0.DomainType) (phase0.Domain, error) { - return bn.bn.DomainData(epoch, domain) + return bn.Bn.DomainData(epoch, domain) } func (bn *TestingBeaconNodeWrapped) SyncCommitteeSubnetID(index phase0.CommitteeIndex) (uint64, error) { - return bn.bn.SyncCommitteeSubnetID(index) + return bn.Bn.SyncCommitteeSubnetID(index) } func (bn *TestingBeaconNodeWrapped) IsSyncCommitteeAggregator(proof []byte) (bool, error) { - return bn.bn.IsSyncCommitteeAggregator(proof) + return bn.Bn.IsSyncCommitteeAggregator(proof) } func (bn *TestingBeaconNodeWrapped) GetSyncCommitteeContribution(slot phase0.Slot, selectionProofs []phase0.BLSSignature, subnetIDs []uint64) (ssz.Marshaler, spec.DataVersion, error) { - return bn.bn.GetSyncCommitteeContribution(slot, selectionProofs, subnetIDs) + return bn.Bn.GetSyncCommitteeContribution(slot, selectionProofs, subnetIDs) } func (bn *TestingBeaconNodeWrapped) SubmitAggregateSelectionProof(slot phase0.Slot, committeeIndex phase0.CommitteeIndex, committeeLength uint64, index phase0.ValidatorIndex, slotSig []byte) (ssz.Marshaler, spec.DataVersion, error) { - return bn.bn.SubmitAggregateSelectionProof(slot, committeeIndex, committeeLength, index, slotSig) + return bn.Bn.SubmitAggregateSelectionProof(slot, committeeIndex, committeeLength, index, slotSig) } func (bn *TestingBeaconNodeWrapped) GetBeaconNetwork() spectypes.BeaconNetwork { - return bn.bn.GetBeaconNetwork() + return bn.Bn.GetBeaconNetwork() } func (bn *TestingBeaconNodeWrapped) GetBeaconBlock(slot phase0.Slot, graffiti, randao []byte) (ssz.Marshaler, spec.DataVersion, error) { - return bn.bn.GetBeaconBlock(slot, graffiti, randao) + return bn.Bn.GetBeaconBlock(slot, graffiti, randao) } func (bn *TestingBeaconNodeWrapped) SubmitValidatorRegistration(pubkey []byte, feeRecipient bellatrix.ExecutionAddress, sig phase0.BLSSignature) error { - return bn.bn.SubmitValidatorRegistration(pubkey, feeRecipient, sig) + return bn.Bn.SubmitValidatorRegistration(pubkey, feeRecipient, sig) } func (bn *TestingBeaconNodeWrapped) SubmitVoluntaryExit(voluntaryExit *phase0.SignedVoluntaryExit) error { - return bn.bn.SubmitVoluntaryExit(voluntaryExit) + return bn.Bn.SubmitVoluntaryExit(voluntaryExit) } func (bn *TestingBeaconNodeWrapped) SubmitAttestations(attestations []*phase0.Attestation) error { - return bn.bn.SubmitAttestations(attestations) + return bn.Bn.SubmitAttestations(attestations) } func (bn *TestingBeaconNodeWrapped) SubmitSyncMessages(msgs []*altair.SyncCommitteeMessage) error { - return bn.bn.SubmitSyncMessages(msgs) + return bn.Bn.SubmitSyncMessages(msgs) } func (bn *TestingBeaconNodeWrapped) SubmitBlindedBeaconBlock(block *api.VersionedBlindedProposal, sig phase0.BLSSignature) error { - return bn.bn.SubmitBlindedBeaconBlock(block, sig) + return bn.Bn.SubmitBlindedBeaconBlock(block, sig) } func (bn *TestingBeaconNodeWrapped) SubmitSignedContributionAndProof(contribution *altair.SignedContributionAndProof) error { - return bn.bn.SubmitSignedContributionAndProof(contribution) + return bn.Bn.SubmitSignedContributionAndProof(contribution) } func (bn *TestingBeaconNodeWrapped) SubmitSignedAggregateSelectionProof(msg *phase0.SignedAggregateAndProof) error { - return bn.bn.SubmitSignedAggregateSelectionProof(msg) + return bn.Bn.SubmitSignedAggregateSelectionProof(msg) } func (bn *TestingBeaconNodeWrapped) SubmitBeaconBlock(block *api.VersionedProposal, sig phase0.BLSSignature) error { - return bn.bn.SubmitBeaconBlock(block, sig) + return bn.Bn.SubmitBeaconBlock(block, sig) } func NewTestingBeaconNodeWrapped() beacon.BeaconNode { bnw := &TestingBeaconNodeWrapped{} - bnw.bn = spectestingutils.NewTestingBeaconNode() + bnw.Bn = spectestingutils.NewTestingBeaconNode() return bnw } diff --git a/message/validation/common_checks.go b/message/validation/common_checks.go index 9c17c6d948..76e611d2a1 100644 --- a/message/validation/common_checks.go +++ b/message/validation/common_checks.go @@ -101,7 +101,9 @@ func (mv *messageValidator) validateBeaconDuty( // Rule: For a proposal duty message, we check if the validator is assigned to it if role == spectypes.RoleProposer { epoch := mv.netCfg.Beacon.EstimatedEpochAtSlot(slot) - if mv.dutyStore.Proposer.ValidatorDuty(epoch, slot, indices[0]) == nil { + // Non-committee roles always have one validator index. + validatorIndex := indices[0] + if mv.dutyStore.Proposer.ValidatorDuty(epoch, slot, validatorIndex) == nil { return ErrNoDuty } } @@ -109,7 +111,9 @@ func (mv *messageValidator) validateBeaconDuty( // Rule: For a sync committee aggregation duty message, we check if the validator is assigned to it if role == spectypes.RoleSyncCommitteeContribution { period := mv.netCfg.Beacon.EstimatedSyncCommitteePeriodAtEpoch(mv.netCfg.Beacon.EstimatedEpochAtSlot(slot)) - if mv.dutyStore.SyncCommittee.Duty(period, indices[0]) == nil { + // Non-committee roles always have one validator index. + validatorIndex := indices[0] + if mv.dutyStore.SyncCommittee.Duty(period, validatorIndex) == nil { return ErrNoDuty } } diff --git a/message/validation/consensus_validation.go b/message/validation/consensus_validation.go index fbfc09de7d..c9fe569e34 100644 --- a/message/validation/consensus_validation.go +++ b/message/validation/consensus_validation.go @@ -330,10 +330,6 @@ func (mv *messageValidator) processSignerState(signedSSVMessage *spectypes.Signe return nil } -func (mv *messageValidator) maxSlotsInState() phase0.Slot { - return phase0.Slot(mv.netCfg.SlotsPerEpoch()) + lateSlotAllowance -} - func (mv *messageValidator) validateJustifications(message *specqbft.Message) error { pj, err := message.GetPrepareJustifications() if err != nil { diff --git a/network/commons/common.go b/network/commons/common.go index f62a4c03ca..6c4929a603 100644 --- a/network/commons/common.go +++ b/network/commons/common.go @@ -24,8 +24,8 @@ const ( peersForSync = 10 - // subnetsCount returns the subnet count for genesis - subnetsCount uint64 = 128 + // SubnetsCount returns the subnet count for genesis + SubnetsCount uint64 = 128 // UnknownSubnet is used when a validator public key is invalid UnknownSubnet = "unknown" @@ -99,12 +99,12 @@ func ValidatorSubnet(validatorPKHex string) int { return -1 } val := hexToUint64(validatorPKHex[:10]) - return int(val % subnetsCount) + return int(val % SubnetsCount) } // CommitteeSubnet returns the subnet for the given committee func CommitteeSubnet(cid spectypes.CommitteeID) int { - subnet := new(big.Int).Mod(new(big.Int).SetBytes(cid[:]), new(big.Int).SetUint64(subnetsCount)) + subnet := new(big.Int).Mod(new(big.Int).SetBytes(cid[:]), new(big.Int).SetUint64(SubnetsCount)) return int(subnet.Int64()) } @@ -126,7 +126,7 @@ func MsgID() MsgIDFunc { // Subnets returns the subnets count for this fork func Subnets() int { - return int(subnetsCount) + return int(SubnetsCount) } // Topics returns the available topics for this fork. diff --git a/network/discovery/dv5_service.go b/network/discovery/dv5_service.go index d62265f585..7d6cf130d3 100644 --- a/network/discovery/dv5_service.go +++ b/network/discovery/dv5_service.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - spectypes "github.com/ssvlabs/ssv-spec/types" "net" "sync/atomic" "time" @@ -61,10 +60,6 @@ type DiscV5Service struct { subnets []byte } -func (dvs *DiscV5Service) UpdateDomainType(logger *zap.Logger, domain spectypes.DomainType) error { - return records.SetDomainTypeEntry(dvs.dv5Listener.LocalNode(), records.KeyDomainType, domain) -} - func newDiscV5Service(pctx context.Context, logger *zap.Logger, discOpts *Options) (Service, error) { ctx, cancel := context.WithCancel(pctx) dvs := DiscV5Service{ diff --git a/network/discovery/dv5_service_test.go b/network/discovery/dv5_service_test.go index ebe4b2dce3..f1129f0733 100644 --- a/network/discovery/dv5_service_test.go +++ b/network/discovery/dv5_service_test.go @@ -2,6 +2,7 @@ package discovery import ( "context" + "github.com/ssvlabs/ssv/networkconfig" "net" "os" "testing" @@ -116,7 +117,7 @@ func TestCheckPeer(t *testing.T) { ctx: ctx, conns: &mock.MockConnectionIndex{LimitValue: true}, subnetsIdx: subnetIndex, - domainType: &TestDomainType{myDomainType}, + domainType: networkconfig.TestNetwork, subnets: mySubnets, } diff --git a/network/discovery/service.go b/network/discovery/service.go index ac62b76c55..2f3640b9d0 100644 --- a/network/discovery/service.go +++ b/network/discovery/service.go @@ -10,7 +10,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "go.uber.org/zap" - spectypes "github.com/ssvlabs/ssv-spec/types" "github.com/ssvlabs/ssv/network/peers" "github.com/ssvlabs/ssv/networkconfig" ) @@ -53,7 +52,6 @@ type Service interface { RegisterSubnets(logger *zap.Logger, subnets ...int) error DeregisterSubnets(logger *zap.Logger, subnets ...int) error Bootstrap(logger *zap.Logger, handler HandleNewPeer) error - UpdateDomainType(logger *zap.Logger, domain spectypes.DomainType) error } // NewService creates new discovery.Service diff --git a/network/network.go b/network/network.go index 4f8fee669e..f804b1459b 100644 --- a/network/network.go +++ b/network/network.go @@ -2,11 +2,11 @@ package network import ( "context" + "github.com/libp2p/go-libp2p/core/peer" "io" "go.uber.org/zap" - spectypes "github.com/ssvlabs/ssv-spec/types" protocolp2p "github.com/ssvlabs/ssv/protocol/v2/p2p" "github.com/ssvlabs/ssv/protocol/v2/ssv/queue" ) @@ -38,10 +38,11 @@ type P2PNetwork interface { SubscribeAll(logger *zap.Logger) error // SubscribeRandoms subscribes to random subnets SubscribeRandoms(logger *zap.Logger, numSubnets int) error - // UpdateDomainType switches domain type at ENR when we reach fork epoch - UpdateDomainType(logger *zap.Logger, domain spectypes.DomainType) error // UpdateScoreParams will update the scoring parameters of GossipSub UpdateScoreParams(logger *zap.Logger) + + // used for tests and api + PeersByTopic() ([]peer.ID, map[string][]peer.ID) } // GetValidatorStats returns stats of validators, including the following: diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 2f4bc8c929..93e6b9fc66 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -2,7 +2,7 @@ package p2pv1 import ( "context" - "encoding/hex" + "errors" "sync/atomic" "time" @@ -83,10 +83,14 @@ type p2pNetwork struct { state int32 activeValidators *hashmap.Map[string, validatorStatus] + activeCommittees *hashmap.Map[string, validatorStatus] backoffConnector *libp2pdiscbackoff.BackoffConnector - subnets []byte - libConnManager connmgrcore.ConnManager + + fixedSubnets []byte + activeSubnets []byte + + libConnManager connmgrcore.ConnManager nodeStorage operatorstorage.Storage operatorPKHashToPKCache *hashmap.Map[string, []byte] // used for metrics @@ -110,6 +114,7 @@ func New(logger *zap.Logger, cfg *Config, mr Metrics) network.P2PNetwork { msgValidator: cfg.MessageValidator, state: stateClosed, activeValidators: hashmap.New[string, validatorStatus](), + activeCommittees: hashmap.New[string, validatorStatus](), nodeStorage: cfg.NodeStorage, operatorPKHashToPKCache: hashmap.New[string, []byte](), operatorSigner: cfg.OperatorSigner, @@ -209,7 +214,7 @@ func (n *p2pNetwork) peersBalancing(logger *zap.Logger) func() { defer cancel() connMgr := peers.NewConnManager(logger, n.libConnManager, n.idx) - mySubnets := records.Subnets(n.subnets).Clone() + mySubnets := records.Subnets(n.activeSubnets).Clone() connMgr.TagBestPeers(logger, n.cfg.MaxPeers-1, mySubnets, allPeers, n.cfg.TopicMaxPeers) connMgr.TrimPeers(ctx, logger, n.host.Network()) } @@ -260,57 +265,88 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) { for ; true; <-ticker.C { start := time.Now() - // Compute the new subnets according to the active validators. - newSubnets := make([]byte, commons.Subnets()) - copy(newSubnets, n.subnets) - n.activeValidators.Range(func(pkHex string, status validatorStatus) bool { - // TODO: alan - fork support - pbBytes, err := hex.DecodeString(pkHex) - if err != nil { - return false - } - // TODO: alan - optimization to get active committees only - share := n.nodeStorage.Shares().Get(nil, pbBytes) - if share == nil { - return false - } - cid := share.CommitteeID() - subnet := commons.CommitteeSubnet(cid) - newSubnets[subnet] = byte(1) + // Compute the new subnets according to the active committees/validators. + updatedSubnets := make([]byte, commons.Subnets()) + copy(updatedSubnets, n.fixedSubnets) + + n.activeCommittees.Range(func(cid string, status validatorStatus) bool { + subnet := commons.CommitteeSubnet(spectypes.CommitteeID([]byte(cid))) + updatedSubnets[subnet] = byte(1) return true }) - n.subnets = newSubnets + + if !n.cfg.Network.PastAlanFork() { + n.activeValidators.Range(func(pkHex string, status validatorStatus) bool { + subnet := commons.ValidatorSubnet(pkHex) + updatedSubnets[subnet] = byte(1) + return true + }) + } + n.activeSubnets = updatedSubnets // Compute the not yet registered subnets. - unregisteredSubnets := make([]int, 0) - for subnet, active := range newSubnets { + addedSubnets := make([]int, 0) + for subnet, active := range updatedSubnets { if active == byte(1) && registeredSubnets[subnet] == byte(0) { - unregisteredSubnets = append(unregisteredSubnets, subnet) + addedSubnets = append(addedSubnets, subnet) } } - registeredSubnets = newSubnets - if len(unregisteredSubnets) == 0 { + // Compute the not anymore registered subnets. + removedSubnets := make([]int, 0) + for subnet, active := range registeredSubnets { + if active == byte(1) && updatedSubnets[subnet] == byte(0) { + removedSubnets = append(removedSubnets, subnet) + } + } + + registeredSubnets = updatedSubnets + + if len(addedSubnets) == 0 && len(removedSubnets) == 0 { continue } n.idx.UpdateSelfRecord(func(self *records.NodeInfo) *records.NodeInfo { - self.Metadata.Subnets = records.Subnets(n.subnets).String() + self.Metadata.Subnets = records.Subnets(n.activeSubnets).String() return self }) - err := n.disc.RegisterSubnets(logger.Named(logging.NameDiscoveryService), unregisteredSubnets...) - if err != nil { - logger.Warn("could not register subnets", zap.Error(err)) - continue + var errs error + if len(addedSubnets) > 0 { + err := n.disc.RegisterSubnets(logger.Named(logging.NameDiscoveryService), addedSubnets...) + if err != nil { + logger.Debug("could not register subnets", zap.Error(err)) + errs = errors.Join(errs, err) + } } + if len(removedSubnets) > 0 { + err := n.disc.DeregisterSubnets(logger.Named(logging.NameDiscoveryService), removedSubnets...) + if err != nil { + logger.Debug("could not unregister subnets", zap.Error(err)) + errs = errors.Join(errs, err) + } + + // Unsubscribe from the removed subnets. + for _, subnet := range removedSubnets { + if err := n.unsubscribeSubnet(logger, uint(subnet)); err != nil { + logger.Debug("could not unsubscribe from subnet", zap.Int("subnet", subnet), zap.Error(err)) + errs = errors.Join(errs, err) + } else { + logger.Debug("unsubscribed from subnet", zap.Int("subnet", subnet)) + } + } + } + allSubs, _ := records.Subnets{}.FromString(records.AllSubnets) - subnetsList := records.SharedSubnets(allSubs, n.subnets, 0) + subnetsList := records.SharedSubnets(allSubs, n.activeSubnets, 0) logger.Debug("updated subnets", - zap.Any("added", unregisteredSubnets), + zap.Any("added", addedSubnets), + zap.Any("removed", removedSubnets), zap.Any("subnets", subnetsList), + zap.Any("subscribed_topics", n.topicsCtrl.Topics()), zap.Int("total_subnets", len(subnetsList)), zap.Duration("took", time.Since(start)), + zap.Error(errs), ) } } @@ -323,19 +359,30 @@ func (n *p2pNetwork) UpdateScoreParams(logger *zap.Logger) { logger = logger.Named(logging.NameP2PNetwork) - // Create ticker - oneEpochDuration := n.cfg.Network.Beacon.SlotDurationSec() * time.Duration(n.cfg.Network.Beacon.SlotsPerEpoch()) - ticker := time.NewTicker(oneEpochDuration) - defer ticker.Stop() + // function to get the starting time of the next epoch + nextEpochStartingTime := func() time.Time { + currEpoch := n.cfg.Network.Beacon.EstimatedCurrentEpoch() + nextEpoch := currEpoch + 1 + return n.cfg.Network.Beacon.EpochStartTime(nextEpoch) + } + + // Create timer that triggers on the beginning of the next epoch + timer := time.NewTimer(time.Until(nextEpochStartingTime())) + defer timer.Stop() // Run immediately and then once every epoch - for ; true; <-ticker.C { + for ; true; <-timer.C { + + // Update score parameters err := n.topicsCtrl.UpdateScoreParams(logger) if err != nil { logger.Debug("score parameters update failed", zap.Error(err)) } else { logger.Debug("updated score parameters successfully") } + + // Reset to trigger on the beginning of the next epoch + timer.Reset(time.Until(nextEpochStartingTime())) } } @@ -346,13 +393,3 @@ func (n *p2pNetwork) getMaxPeers(topic string) int { } return n.cfg.TopicMaxPeers } - -// UpdateDomainAtFork updates Domain Type at ENR node record after fork epoch. -func (n *p2pNetwork) UpdateDomainType(logger *zap.Logger, domain spectypes.DomainType) error { - if err := n.disc.UpdateDomainType(logger, domain); err != nil { - logger.Error("could not update domain type", zap.Error(err)) - return err - } - logger.Debug("updated and published ENR with domain type", fields.Domain(domain)) - return nil -} diff --git a/network/p2p/p2p_pubsub.go b/network/p2p/p2p_pubsub.go index 954625a77d..eda36f4a75 100644 --- a/network/p2p/p2p_pubsub.go +++ b/network/p2p/p2p_pubsub.go @@ -10,7 +10,6 @@ import ( "github.com/ssvlabs/ssv/protocol/v2/message" pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" spectypes "github.com/ssvlabs/ssv-spec/types" "go.uber.org/zap" @@ -36,24 +35,12 @@ func (n *p2pNetwork) UseMessageRouter(router network.MessageRouter) { n.msgRouter = router } -// Peers registers a message router to handle incoming messages -func (n *p2pNetwork) Peers(pk spectypes.ValidatorPK) ([]peer.ID, error) { - all := make([]peer.ID, 0) - // TODO: Alan - fork support - share := n.nodeStorage.Shares().Get(nil, pk[:]) - if share == nil { - return nil, fmt.Errorf("could not find validator: %x", pk[:]) - } - cmtid := share.CommitteeID() - topics := commons.CommitteeTopicID(cmtid) - for _, topic := range topics { - peers, err := n.topicsCtrl.Peers(topic) - if err != nil { - return nil, err - } - all = append(all, peers...) +func dutyExecutorToID(fork bool, dutyExecutor []byte) []byte { + if fork { + return dutyExecutor[16:] + } else { + return dutyExecutor } - return all, nil } // Broadcast publishes the message to all peers in subnet @@ -82,25 +69,14 @@ func (n *p2pNetwork) Broadcast(msgID spectypes.MessageID, msg *spectypes.SignedS return fmt.Errorf("could not encode signed ssv message: %w", err) } - var committeeID spectypes.CommitteeID - if msg.SSVMessage.MsgID.GetRoleType() == spectypes.RoleCommittee { - committeeID = spectypes.CommitteeID(msg.SSVMessage.MsgID.GetDutyExecutorID()[16:]) - } else { - share := n.nodeStorage.ValidatorStore().Validator(msg.SSVMessage.MsgID.GetDutyExecutorID()) - if share == nil { - return fmt.Errorf("could not find validator: %x", msg.SSVMessage.MsgID.GetDutyExecutorID()) - } - committeeID = share.CommitteeID() + topics, err := n.broadcastTopics(dutyExecutorToID(n.cfg.Network.PastAlanFork(), msg.SSVMessage.MsgID.GetDutyExecutorID())) + if err != nil { + return fmt.Errorf("could not get validator topics: %w", err) } - topics := commons.CommitteeTopicID(committeeID) for _, topic := range topics { - n.interfaceLogger.Debug("broadcasting msg", - zap.String("committee_id", hex.EncodeToString(committeeID[:])), - zap.Int("msg_type", int(msg.SSVMessage.MsgType)), - fields.Topic(topic)) if err := n.topicsCtrl.Broadcast(topic, encodedMsg, n.cfg.RequestTimeout); err != nil { - n.interfaceLogger.Debug("could not broadcast msg", fields.CommitteeID(committeeID), zap.Error(err)) + n.interfaceLogger.Debug("could not broadcast msg", fields.Topic(topic), zap.Error(err)) return fmt.Errorf("could not broadcast msg: %w", err) } } @@ -111,7 +87,7 @@ func (n *p2pNetwork) SubscribeAll(logger *zap.Logger) error { if !n.isReady() { return p2pprotocol.ErrNetworkIsNotReady } - n.subnets, _ = records.Subnets{}.FromString(records.AllSubnets) + n.fixedSubnets, _ = records.Subnets{}.FromString(records.AllSubnets) for subnet := 0; subnet < commons.Subnets(); subnet++ { err := n.topicsCtrl.Subscribe(logger, commons.SubnetTopicID(subnet)) if err != nil { @@ -143,11 +119,11 @@ func (n *p2pNetwork) SubscribeRandoms(logger *zap.Logger, numSubnets int) error // Update the subnets slice. subnets := make([]byte, commons.Subnets()) - copy(subnets, n.subnets) + copy(subnets, n.fixedSubnets) for _, subnet := range randomSubnets { subnets[subnet] = byte(1) } - n.subnets = subnets + n.fixedSubnets = subnets return nil } @@ -157,62 +133,91 @@ func (n *p2pNetwork) Subscribe(pk spectypes.ValidatorPK) error { if !n.isReady() { return p2pprotocol.ErrNetworkIsNotReady } + + share := n.nodeStorage.ValidatorStore().Validator(pk[:]) + if share == nil { + return fmt.Errorf("could not find share for validator %s", hex.EncodeToString(pk[:])) + } + + err := n.subscribeCommittee(share.CommitteeID()) + if err != nil { + return fmt.Errorf("could not subscribe to committee: %w", err) + } + if !n.cfg.Network.PastAlanFork() { + return n.subscribeValidator(pk) + } + return nil +} + +// subscribeCommittee handles the subscription logic for committee subnets +func (n *p2pNetwork) subscribeCommittee(cid spectypes.CommitteeID) error { + n.interfaceLogger.Debug("subscribing to committee", fields.CommitteeID(cid)) + status, found := n.activeCommittees.GetOrInsert(string(cid[:]), validatorStatusSubscribing) + if found && status != validatorStatusInactive { + return nil + } + + for _, topic := range commons.CommitteeTopicID(cid) { + if err := n.topicsCtrl.Subscribe(n.interfaceLogger, topic); err != nil { + return fmt.Errorf("could not subscribe to topic %s: %w", topic, err) + } + } + n.activeCommittees.Set(string(cid[:]), validatorStatusSubscribed) + return nil +} + +// subscribeValidator handles the subscription logic for validator subnets +func (n *p2pNetwork) subscribeValidator(pk spectypes.ValidatorPK) error { pkHex := hex.EncodeToString(pk[:]) + n.interfaceLogger.Debug("subscribing to validator", zap.String("validator", pkHex)) status, found := n.activeValidators.GetOrInsert(pkHex, validatorStatusSubscribing) if found && status != validatorStatusInactive { return nil } - err := n.subscribe(n.interfaceLogger, pk) - if err != nil { - return err + for _, topic := range commons.ValidatorTopicID(pk[:]) { + if err := n.topicsCtrl.Subscribe(n.interfaceLogger, topic); err != nil { + return fmt.Errorf("could not subscribe to topic %s: %w", topic, err) + } } n.activeValidators.Set(pkHex, validatorStatusSubscribed) return nil } -// Unsubscribe unsubscribes from the validator subnet -func (n *p2pNetwork) Unsubscribe(logger *zap.Logger, pk spectypes.ValidatorPK) error { +func (n *p2pNetwork) unsubscribeSubnet(logger *zap.Logger, subnet uint) error { if !n.isReady() { return p2pprotocol.ErrNetworkIsNotReady } - pkHex := hex.EncodeToString(pk[:]) - if status, _ := n.activeValidators.Get(pkHex); status != validatorStatusSubscribed { - return nil + if subnet >= uint(commons.Subnets()) { + return fmt.Errorf("invalid subnet %d", subnet) } - cmtid := n.nodeStorage.ValidatorStore().Validator(pk[:]).CommitteeID() - topics := commons.CommitteeTopicID(cmtid) - for _, topic := range topics { - if err := n.topicsCtrl.Unsubscribe(logger, topic, false); err != nil { - return err - } + if err := n.topicsCtrl.Unsubscribe(logger, commons.SubnetTopicID(int(subnet)), false); err != nil { + return fmt.Errorf("could not unsubscribe from subnet %d: %w", subnet, err) } - n.activeValidators.Del(pkHex) return nil } -//TODO: alan genesis -// subscribe to validator topics, as defined in the fork -//func (n *p2pNetwork) subscribe(logger *zap.Logger, pk spectypes.ValidatorPK) error { -// topics := commons.ValidatorTopicID(pk[:]) -// for _, topic := range topics { -// if err := n.topicsCtrl.Subscribe(logger, topic); err != nil { -// // return errors.Wrap(err, "could not broadcast message") -// return err -// } -// } -// return nil -//} - -// subscribe to validator topics, as defined in the fork -func (n *p2pNetwork) subscribe(logger *zap.Logger, pk spectypes.ValidatorPK) error { +// Unsubscribe unsubscribes from the validator subnet +func (n *p2pNetwork) Unsubscribe(logger *zap.Logger, pk spectypes.ValidatorPK) error { + if !n.isReady() { + return p2pprotocol.ErrNetworkIsNotReady + } + + if !n.cfg.Network.PastAlanFork() { + pkHex := hex.EncodeToString(pk[:]) + if status, _ := n.activeValidators.Get(pkHex); status != validatorStatusSubscribed { + return nil + } + n.activeValidators.Del(pkHex) + } + cmtid := n.nodeStorage.ValidatorStore().Validator(pk[:]).CommitteeID() topics := commons.CommitteeTopicID(cmtid) for _, topic := range topics { - if err := n.topicsCtrl.Subscribe(logger, topic); err != nil { - // return errors.Wrap(err, "could not broadcast message") + if err := n.topicsCtrl.Unsubscribe(logger, topic, false); err != nil { return err } } + n.activeCommittees.Del(string(cmtid[:])) return nil } @@ -238,13 +243,6 @@ func (n *p2pNetwork) handlePubsubMessages(logger *zap.Logger) func(ctx context.C return errors.New("message was not decoded") } - //p2pID := decodedMsg.GetID().String() - - // logger.With( - // zap.String("pubKey", hex.EncodeToString(ssvMsg.MsgID.GetPubKey())), - // zap.String("role", ssvMsg.MsgID.GetRoleType().String()), - // ).Debug("handlePubsubMessages") - metricsRouterIncoming.WithLabelValues(message.MsgTypeToString(decodedMsg.MsgType)).Inc() n.msgRouter.Route(ctx, decodedMsg) @@ -255,11 +253,11 @@ func (n *p2pNetwork) handlePubsubMessages(logger *zap.Logger) func(ctx context.C // subscribeToSubnets subscribes to all the node's subnets func (n *p2pNetwork) subscribeToSubnets(logger *zap.Logger) error { - if len(n.subnets) == 0 { + if len(n.fixedSubnets) == 0 { return nil } - logger.Debug("subscribing to subnets", fields.Subnets(n.subnets)) - for i, val := range n.subnets { + logger.Debug("subscribing to fixed subnets", fields.Subnets(n.fixedSubnets)) + for i, val := range n.fixedSubnets { if val > 0 { subnet := fmt.Sprintf("%d", i) if err := n.topicsCtrl.Subscribe(logger, subnet); err != nil { @@ -271,3 +269,15 @@ func (n *p2pNetwork) subscribeToSubnets(logger *zap.Logger) error { } return nil } + +func (n *p2pNetwork) broadcastTopics(id []byte) ([]string, error) { + if n.cfg.Network.PastAlanFork() { + share := n.nodeStorage.ValidatorStore().Committee(spectypes.CommitteeID(id)) + if share == nil { + return nil, fmt.Errorf("could not find share for validator %s", hex.EncodeToString(id)) + } + cid := share.ID + return commons.CommitteeTopicID(cid), nil + } + return commons.ValidatorTopicID(id), nil +} diff --git a/network/p2p/p2p_setup.go b/network/p2p/p2p_setup.go index f7cdcdf70f..b104999628 100644 --- a/network/p2p/p2p_setup.go +++ b/network/p2p/p2p_setup.go @@ -94,9 +94,9 @@ func (n *p2pNetwork) initCfg() error { if err != nil { return fmt.Errorf("parse subnet: %w", err) } - n.subnets = subnets + n.fixedSubnets = subnets } else { - n.subnets = make(records.Subnets, p2pcommons.Subnets()) + n.fixedSubnets = make(records.Subnets, p2pcommons.Subnets()) } if n.cfg.MaxPeers <= 0 { n.cfg.MaxPeers = minPeersBuffer @@ -174,7 +174,7 @@ func (n *p2pNetwork) setupPeerServices(logger *zap.Logger) error { self := records.NewNodeInfo(domain) self.Metadata = &records.NodeMetadata{ NodeVersion: commons.GetNodeVersion(), - Subnets: records.Subnets(n.subnets).String(), + Subnets: records.Subnets(n.fixedSubnets).String(), } getPrivKey := func() crypto.PrivKey { return libPrivKey @@ -195,7 +195,7 @@ func (n *p2pNetwork) setupPeerServices(logger *zap.Logger) error { } subnetsProvider := func() records.Subnets { - return n.subnets + return n.activeSubnets } filters := func() []connections.HandshakeFilter { @@ -242,8 +242,9 @@ func (n *p2pNetwork) setupDiscovery(logger *zap.Logger) error { Bootnodes: n.cfg.TransformBootnodes(), EnableLogging: n.cfg.DiscoveryTrace, } - if len(n.subnets) > 0 { - discV5Opts.Subnets = n.subnets + if len(n.fixedSubnets) > 0 { + discV5Opts.Subnets = n.fixedSubnets + logger = logger.With(zap.String("subnets", records.Subnets(n.fixedSubnets).String())) } logger.Info("discovery: using discv5", zap.Strings("bootnodes", discV5Opts.Bootnodes)) } else { @@ -271,11 +272,12 @@ func (n *p2pNetwork) setupDiscovery(logger *zap.Logger) error { func (n *p2pNetwork) setupPubsub(logger *zap.Logger) error { cfg := &topics.PubSubConfig{ - Host: n.host, - TraceLog: n.cfg.PubSubTrace, - MsgValidator: n.msgValidator, - MsgHandler: n.handlePubsubMessages(logger), - ScoreIndex: n.idx, + NetworkConfig: n.cfg.Network, + Host: n.host, + TraceLog: n.cfg.PubSubTrace, + MsgValidator: n.msgValidator, + MsgHandler: n.handlePubsubMessages(logger), + ScoreIndex: n.idx, //Discovery: n.disc, OutboundQueueSize: n.cfg.PubsubOutQueueSize, ValidationQueueSize: n.cfg.PubsubValidationQueueSize, diff --git a/network/p2p/p2p_sync.go b/network/p2p/p2p_sync.go index 9a5f9c9028..3c680bb56d 100644 --- a/network/p2p/p2p_sync.go +++ b/network/p2p/p2p_sync.go @@ -1,9 +1,7 @@ package p2pv1 import ( - "encoding/hex" "fmt" - "math/rand" "time" libp2pnetwork "github.com/libp2p/go-libp2p/core/network" @@ -81,39 +79,6 @@ func (n *p2pNetwork) handleStream(logger *zap.Logger, handler p2pprotocol.Reques } } -// getSubsetOfPeers returns a subset of the peers from that topic -func (n *p2pNetwork) getSubsetOfPeers(logger *zap.Logger, senderID []byte, maxPeers int, filter func(peer.ID) bool) (peers []peer.ID, err error) { - var ps []peer.ID - seen := make(map[peer.ID]struct{}) - // TODO: fork support - topics := commons.CommitteeTopicID(spectypes.CommitteeID(senderID[16:])) - for _, topic := range topics { - ps, err = n.topicsCtrl.Peers(topic) - if err != nil { - continue - } - for _, p := range ps { - if _, ok := seen[p]; !ok && filter(p) { - peers = append(peers, p) - seen[p] = struct{}{} - } - } - } - // if we seen some peers, ignore the error - if err != nil && len(seen) == 0 { - return nil, errors.Wrapf(err, "could not read peers for validator %s", hex.EncodeToString(senderID)) - } - if len(peers) == 0 { - return nil, nil - } - if maxPeers > len(peers) { - maxPeers = len(peers) - } else { - rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) - } - return peers[:maxPeers], nil -} - func (n *p2pNetwork) makeSyncRequest(logger *zap.Logger, peers []peer.ID, mid spectypes.MessageID, protocol libp2p_protocol.ID, syncMsg *message.SyncMessage) ([]p2pprotocol.SyncResult, error) { var results []p2pprotocol.SyncResult data, err := syncMsg.Encode() diff --git a/network/p2p/p2p_test.go b/network/p2p/p2p_test.go index 3db2bcbc36..bac63ae28f 100644 --- a/network/p2p/p2p_test.go +++ b/network/p2p/p2p_test.go @@ -3,6 +3,14 @@ package p2pv1 import ( "context" "encoding/hex" + "github.com/pkg/errors" + genesisspecqbft "github.com/ssvlabs/ssv-spec-pre-cc/qbft" + "github.com/ssvlabs/ssv/network" + "github.com/ssvlabs/ssv/network/commons" + "github.com/ssvlabs/ssv/protocol/v2/message" + p2pprotocol "github.com/ssvlabs/ssv/protocol/v2/p2p" + "github.com/ssvlabs/ssv/protocol/v2/ssv/queue" + "math/rand" "sync" "sync/atomic" "testing" @@ -14,7 +22,6 @@ import ( specqbft "github.com/ssvlabs/ssv-spec/qbft" spectypes "github.com/ssvlabs/ssv-spec/types" - spectestingutils "github.com/ssvlabs/ssv-spec/types/testingutils" "github.com/ssvlabs/ssv/logging" "github.com/ssvlabs/ssv/networkconfig" ) @@ -33,13 +40,12 @@ func TestP2pNetwork_SubscribeBroadcast(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ks := spectestingutils.Testing4SharesSet() pks := []string{"8e80066551a81b318258709edaf7dd1f63cd686a0e4db8b29bbb7acfe65608677af5a527d9448ee47835485e02b50bc0"} - ln, routers, err := CreateNetworkAndSubscribeFromKeySet(t, ctx, LocalNetOptions{ + ln, routers, err := createNetworkAndSubscribe(t, ctx, LocalNetOptions{ Nodes: n, MinConnected: n/2 - 1, UseDiscv5: false, - }, ks, pks...) + }, pks...) require.NoError(t, err) require.NotNil(t, routers) require.NotNil(t, ln) @@ -111,12 +117,11 @@ func TestP2pNetwork_Stream(t *testing.T) { defer cancel() pkHex := "8e80066551a81b318258709edaf7dd1f63cd686a0e4db8b29bbb7acfe65608677af5a527d9448ee47835485e02b50bc0" - ks := spectestingutils.Testing4SharesSet() - ln, _, err := CreateNetworkAndSubscribeFromKeySet(t, ctx, LocalNetOptions{ + ln, _, err := createNetworkAndSubscribe(t, ctx, LocalNetOptions{ Nodes: n, MinConnected: n/2 - 1, UseDiscv5: false, - }, ks, pkHex) + }, pkHex) defer func() { for _, node := range ln.Nodes { @@ -216,3 +221,191 @@ func TestWaitSubsetOfPeers(t *testing.T) { }) } } + +func dummyMsgCommittee(t *testing.T, pkHex string, height int) (spectypes.MessageID, *spectypes.SignedSSVMessage) { + return dummyMsg(t, pkHex, height, spectypes.RoleCommittee) +} + +func dummyMsg(t *testing.T, pkHex string, height int, role spectypes.RunnerRole) (spectypes.MessageID, *spectypes.SignedSSVMessage) { + pk, err := hex.DecodeString(pkHex) + require.NoError(t, err) + id := spectypes.NewMsgID(networkconfig.TestNetwork.DomainType(), pk, role) + signedMsg := &genesisspecqbft.SignedMessage{ + Message: genesisspecqbft.Message{ + MsgType: genesisspecqbft.CommitMsgType, + Round: 2, + Identifier: id[:], + Height: genesisspecqbft.Height(height), + Root: [32]byte{0x1, 0x2, 0x3}, + }, + Signature: []byte("sVV0fsvqQlqliKv/ussGIatxpe8LDWhc9uoaM5WpjbiYvvxUr1eCpz0ja7UT1PGNDdmoGi6xbMC1g/ozhAt4uCdpy0Xdfqbv"), + Signers: []spectypes.OperatorID{1, 3, 4}, + } + data, err := signedMsg.Encode() + require.NoError(t, err) + ssvMsg := &spectypes.SSVMessage{ + MsgType: spectypes.SSVConsensusMsgType, + MsgID: id, + Data: data, + } + signedSSVMsg, err := spectypes.SSVMessageToSignedSSVMessage(ssvMsg, 1, dummySignSSVMessage) + require.NoError(t, err) + + return id, signedSSVMsg +} + +func dummySignSSVMessage(_ *spectypes.SSVMessage) ([]byte, error) { + return []byte{}, nil +} + +type dummyRouter struct { + count uint64 + i int +} + +func (r *dummyRouter) Route(_ context.Context, _ *queue.DecodedSSVMessage) { + atomic.AddUint64(&r.count, 1) +} + +func createNetworkAndSubscribe(t *testing.T, ctx context.Context, options LocalNetOptions, pks ...string) (*LocalNet, []*dummyRouter, error) { + logger, err := zap.NewDevelopment() + require.NoError(t, err) + ln, err := CreateAndStartLocalNet(ctx, logger.Named("createNetworkAndSubscribe"), options) + if err != nil { + return nil, nil, err + } + if len(ln.Nodes) != options.Nodes { + return nil, nil, errors.Errorf("only %d peers created, expected %d", len(ln.Nodes), options.Nodes) + } + + logger.Debug("created local network") + + routers := make([]*dummyRouter, options.Nodes) + for i, node := range ln.Nodes { + routers[i] = &dummyRouter{ + i: i, + } + node.UseMessageRouter(routers[i]) + } + + logger.Debug("subscribing to topics") + + var wg sync.WaitGroup + for _, pk := range pks { + vpk, err := hex.DecodeString(pk) + if err != nil { + return nil, nil, errors.Wrap(err, "could not decode validator public key") + } + for _, node := range ln.Nodes { + wg.Add(1) + go func(node network.P2PNetwork, vpk spectypes.ValidatorPK) { + defer wg.Done() + if err := node.Subscribe(vpk); err != nil { + logger.Warn("could not subscribe to topic", zap.Error(err)) + } + }(node, spectypes.ValidatorPK(vpk)) + } + } + wg.Wait() + // let the nodes subscribe + for { + noPeers := false + for _, node := range ln.Nodes { + peers, _ := node.PeersByTopic() + if len(peers) < 2 { + noPeers = true + } + } + if noPeers { + noPeers = false + time.Sleep(time.Second * 1) + continue + } + break + } + + return ln, routers, nil +} + +func (n *p2pNetwork) LastDecided(logger *zap.Logger, mid spectypes.MessageID) ([]p2pprotocol.SyncResult, error) { + const ( + minPeers = 3 + waitTime = time.Second * 24 + ) + if !n.isReady() { + return nil, p2pprotocol.ErrNetworkIsNotReady + } + pid, maxPeers := commons.ProtocolID(p2pprotocol.LastDecidedProtocol) + peers, err := waitSubsetOfPeers(logger, n.getSubsetOfPeers, mid.GetDutyExecutorID(), minPeers, maxPeers, waitTime, allPeersFilter) + if err != nil { + return nil, errors.Wrap(err, "could not get subset of peers") + } + return n.makeSyncRequest(logger, peers, mid, pid, &message.SyncMessage{ + Params: &message.SyncParams{ + Identifier: mid, + }, + Protocol: message.LastDecidedType, + }) +} + +// getSubsetOfPeers returns a subset of the peers from that topic +func (n *p2pNetwork) getSubsetOfPeers(logger *zap.Logger, senderID []byte, maxPeers int, filter func(peer.ID) bool) (peers []peer.ID, err error) { + var ps []peer.ID + seen := make(map[peer.ID]struct{}) + topics := commons.ValidatorTopicID(senderID) + for _, topic := range topics { + ps, err = n.topicsCtrl.Peers(topic) + if err != nil { + continue + } + for _, p := range ps { + if _, ok := seen[p]; !ok && filter(p) { + peers = append(peers, p) + seen[p] = struct{}{} + } + } + } + // if we seen some peers, ignore the error + if err != nil && len(seen) == 0 { + return nil, errors.Wrapf(err, "could not read peers for validator %s", hex.EncodeToString(senderID)) + } + if len(peers) == 0 { + return nil, nil + } + if maxPeers > len(peers) { + maxPeers = len(peers) + } else { + rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) + } + return peers[:maxPeers], nil +} + +func registerHandler(logger *zap.Logger, node network.P2PNetwork, mid spectypes.MessageID, height specqbft.Height, round specqbft.Round, counter *int64, errors chan<- error) { + node.RegisterHandlers(logger, &p2pprotocol.SyncHandler{ + Protocol: p2pprotocol.LastDecidedProtocol, + Handler: func(message *spectypes.SSVMessage) (*spectypes.SSVMessage, error) { + atomic.AddInt64(counter, 1) + sm := genesisspecqbft.SignedMessage{ + Signature: make([]byte, 96), + Signers: []spectypes.OperatorID{1, 2, 3}, + Message: genesisspecqbft.Message{ + MsgType: genesisspecqbft.CommitMsgType, + Height: genesisspecqbft.Height(height), + Round: genesisspecqbft.Round(round), + Identifier: mid[:], + Root: [32]byte{1, 2, 3}, + }, + } + data, err := sm.Encode() + if err != nil { + errors <- err + return nil, err + } + return &spectypes.SSVMessage{ + MsgType: spectypes.SSVConsensusMsgType, + MsgID: mid, + Data: data, + }, nil + }, + }) +} diff --git a/network/p2p/p2p_validation_test.go b/network/p2p/p2p_validation_test.go index 7db166a5f4..a632261ced 100644 --- a/network/p2p/p2p_validation_test.go +++ b/network/p2p/p2p_validation_test.go @@ -5,6 +5,8 @@ import ( cryptorand "crypto/rand" "encoding/hex" "fmt" + "github.com/cornelk/hashmap" + "github.com/libp2p/go-libp2p/core/peer" "math/rand" "os" "sort" @@ -19,7 +21,6 @@ import ( "github.com/stretchr/testify/require" spectypes "github.com/ssvlabs/ssv-spec/types" - spectestingutils "github.com/ssvlabs/ssv-spec/types/testingutils" "github.com/ssvlabs/ssv/message/validation" ) @@ -54,12 +55,11 @@ func TestP2pNetwork_MessageValidation(t *testing.T) { ignoredRole = spectypes.RoleAggregator rejectedRole = spectypes.RoleSyncCommitteeContribution ) - messageValidators := CreateMsgValidators(&mtx, nodeCount, vNet) + messageValidators := make([]*MockMessageValidator, nodeCount) // Create a VirtualNet with 4 nodes. - ks := spectestingutils.Testing4SharesSet() vNet = CreateVirtualNet(t, ctx, 4, validators, func(nodeIndex int) validation.MessageValidator { return messageValidators[nodeIndex] - }, ks) + }) defer func() { require.NoError(t, vNet.Close()) }() @@ -204,3 +204,116 @@ func TestP2pNetwork_MessageValidation(t *testing.T) { } defer fmt.Println() } + +type MockMessageValidator struct { + Accepted []int + Ignored []int + Rejected []int + TotalAccepted int + TotalIgnored int + TotalRejected int + + ValidateFunc func(ctx context.Context, p peer.ID, pmsg *pubsub.Message) pubsub.ValidationResult +} + +func (v *MockMessageValidator) ValidatorForTopic(topic string) func(ctx context.Context, p peer.ID, pmsg *pubsub.Message) pubsub.ValidationResult { + return v.Validate +} + +func (v *MockMessageValidator) Validate(ctx context.Context, p peer.ID, pmsg *pubsub.Message) pubsub.ValidationResult { + return v.ValidateFunc(ctx, p, pmsg) +} + +type NodeIndex int + +type VirtualNode struct { + Index NodeIndex + Network *p2pNetwork + PeerScores *hashmap.Map[NodeIndex, *pubsub.PeerScoreSnapshot] +} + +func (n *VirtualNode) Broadcast(msgID spectypes.MessageID, msg *spectypes.SignedSSVMessage) error { + return n.Network.Broadcast(msgID, msg) +} + +// VirtualNet is a utility to create & interact with a virtual network of nodes. +type VirtualNet struct { + Nodes []*VirtualNode +} + +func CreateVirtualNet( + t *testing.T, + ctx context.Context, + nodes int, + validatorPubKeys []string, + messageValidatorProvider func(int) validation.MessageValidator, +) *VirtualNet { + var doneSetup atomic.Bool + vn := &VirtualNet{} + ln, routers, err := createNetworkAndSubscribe(t, ctx, LocalNetOptions{ + Nodes: nodes, + MinConnected: nodes - 1, + UseDiscv5: false, + TotalValidators: 1000, + ActiveValidators: 800, + MyValidators: 300, + MessageValidatorProvider: messageValidatorProvider, + PeerScoreInspector: func(selfPeer peer.ID, peerMap map[peer.ID]*pubsub.PeerScoreSnapshot) { + if !doneSetup.Load() { + return + } + node := vn.NodeByPeerID(selfPeer) + if node == nil { + t.Fatalf("self peer not found (%s)", selfPeer) + } + + node.PeerScores.Range(func(index NodeIndex, snapshot *pubsub.PeerScoreSnapshot) bool { + node.PeerScores.Del(index) + return true + }) + for peerID, peerScore := range peerMap { + peerNode := vn.NodeByPeerID(peerID) + if peerNode == nil { + t.Fatalf("peer not found (%s)", peerID) + } + node.PeerScores.Set(peerNode.Index, peerScore) + } + + }, + PeerScoreInspectorInterval: time.Millisecond * 5, + }, validatorPubKeys...) + + require.NoError(t, err) + require.NotNil(t, routers) + require.NotNil(t, ln) + + for i, node := range ln.Nodes { + vn.Nodes = append(vn.Nodes, &VirtualNode{ + Index: NodeIndex(i), + Network: node.(*p2pNetwork), + PeerScores: hashmap.New[NodeIndex, *pubsub.PeerScoreSnapshot](), //{}make(map[NodeIndex]*pubsub.PeerScoreSnapshot), + }) + } + doneSetup.Store(true) + + return vn +} + +func (vn *VirtualNet) NodeByPeerID(peerID peer.ID) *VirtualNode { + for _, node := range vn.Nodes { + if node.Network.Host().ID() == peerID { + return node + } + } + return nil +} + +func (vn *VirtualNet) Close() error { + for _, node := range vn.Nodes { + err := node.Network.Close() + if err != nil { + return err + } + } + return nil +} diff --git a/network/peers/connections/filters.go b/network/peers/connections/filters.go index 162ca6e823..7d05a265d4 100644 --- a/network/peers/connections/filters.go +++ b/network/peers/connections/filters.go @@ -17,3 +17,5 @@ func NetworkIDFilter(networkID string) HandshakeFilter { return nil } } + +// TODO: filter based on domaintype diff --git a/network/topics/container.go b/network/topics/container.go index a810946f8f..4fd2a32442 100644 --- a/network/topics/container.go +++ b/network/topics/container.go @@ -67,16 +67,17 @@ func (tc *topicsContainer) Join(name string, opts ...pubsub.TopicOpt) (*pubsub.T return topic, nil } -func (tc *topicsContainer) Unsubscribe(name string) { +func (tc *topicsContainer) Unsubscribe(name string) bool { tc.subLock.Lock() defer tc.subLock.Unlock() sub, ok := tc.subs[name] if !ok { - return + return false } delete(tc.subs, name) sub.Cancel() + return true } func (tc *topicsContainer) Subscribe(name string, opts ...pubsub.SubOpt) (*pubsub.Subscription, error) { diff --git a/network/topics/controller.go b/network/topics/controller.go index 88c629ec19..44866559ed 100644 --- a/network/topics/controller.go +++ b/network/topics/controller.go @@ -125,7 +125,7 @@ func (ctrl *topicsCtrl) UpdateScoreParams(logger *zap.Logger) error { continue } if err := topic.SetScoreParams(p); err != nil { - errs = errs + fmt.Sprintf("could not set score params for topic %s: %w; ", topicName, err) + errs = errs + fmt.Sprintf("could not set score params for topic %s: %d; ", topicName, err) continue } } @@ -210,7 +210,11 @@ func (ctrl *topicsCtrl) Broadcast(name string, data []byte, timeout time.Duratio // Unsubscribe unsubscribes from the given topic, only if there are no other subscribers of the given topic // if hard is true, we will unsubscribe the topic even if there are more subscribers. func (ctrl *topicsCtrl) Unsubscribe(logger *zap.Logger, name string, hard bool) error { - ctrl.container.Unsubscribe(name) + name = commons.GetTopicFullName(name) + + if !ctrl.container.Unsubscribe(name) { + return fmt.Errorf("failed to unsubscribe from topic %s: not subscribed", name) + } if ctrl.msgValidator != nil { err := ctrl.ps.UnregisterTopicValidator(name) @@ -289,15 +293,18 @@ func (ctrl *topicsCtrl) listen(logger *zap.Logger, sub *pubsub.Subscription) err func (ctrl *topicsCtrl) setupTopicValidator(name string) error { if ctrl.msgValidator != nil { // first try to unregister in case there is already a msg validator for that topic (e.g. fork scenario) - _ = ctrl.ps.UnregisterTopicValidator(name) + err := ctrl.ps.UnregisterTopicValidator(name) + if err != nil { + ctrl.logger.Debug("failed to unregister topic validator", zap.String("topic", name), zap.Error(err)) + } var opts []pubsub.ValidatorOpt // Optional: set a timeout for message validation // opts = append(opts, pubsub.WithValidatorTimeout(time.Second)) - err := ctrl.ps.RegisterTopicValidator(name, ctrl.msgValidator.ValidatorForTopic(name), opts...) + err = ctrl.ps.RegisterTopicValidator(name, ctrl.msgValidator.ValidatorForTopic(name), opts...) if err != nil { - return errors.Wrap(err, "could not register topic validator") + return fmt.Errorf("could not register topic validator: %w", err) } } return nil diff --git a/network/topics/controller_test.go b/network/topics/controller_test.go index 1873cc890a..8145703c2c 100644 --- a/network/topics/controller_test.go +++ b/network/topics/controller_test.go @@ -5,6 +5,10 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" + genesisspecqbft "github.com/ssvlabs/ssv-spec-pre-cc/qbft" + registrystorage "github.com/ssvlabs/ssv/registry/storage" + "github.com/ssvlabs/ssv/storage/basedb" + "github.com/ssvlabs/ssv/storage/kv" "math" "sync" "sync/atomic" @@ -16,10 +20,10 @@ import ( "github.com/libp2p/go-libp2p/core/host" libp2pnetwork "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + genesisvalidation "github.com/ssvlabs/ssv/message/validation/genesis" "github.com/stretchr/testify/require" "go.uber.org/zap" - specqbft "github.com/ssvlabs/ssv-spec/qbft" spectypes "github.com/ssvlabs/ssv-spec/types" "github.com/ssvlabs/ssv/logging" "github.com/ssvlabs/ssv/message/validation" @@ -27,7 +31,6 @@ import ( "github.com/ssvlabs/ssv/network/commons" "github.com/ssvlabs/ssv/network/discovery" "github.com/ssvlabs/ssv/networkconfig" - "github.com/ssvlabs/ssv/protocol/v2/ssv/queue" ) func TestTopicManager(t *testing.T) { @@ -68,7 +71,7 @@ func TestTopicManager(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - validator := validation.NewMessageValidator(networkconfig.TestNetwork) + validator := genesisvalidation.New(networkconfig.TestNetwork) scoreMap := map[peer.ID]*pubsub.PeerScoreSnapshot{} var scoreMapMu sync.Mutex @@ -372,8 +375,15 @@ func newPeer(ctx context.Context, logger *zap.Logger, t *testing.T, msgValidator ScoreInspectorInterval: 100 * time.Millisecond, // TODO: add mock for peers.ScoreIndex } + db, err := kv.NewInMemory(logger, basedb.Options{}) + require.NoError(t, err) - ps, tm, err := NewPubSub(ctx, logger, cfg, metricsreporter.NewNop()) + _, validatorStore, err := registrystorage.NewSharesStorage(logger, db, []byte("test")) + if err != nil { + t.Fatal(err) + } + + ps, tm, err := NewPubSub(ctx, logger, cfg, metricsreporter.NewNop(), validatorStore) require.NoError(t, err) p = &P{ @@ -417,18 +427,18 @@ func dummyMsg(pkHex string, height int, malformed bool) (*spectypes.SSVMessage, return nil, err } - id := spectypes.NewMsgID(networkconfig.TestNetwork.DomainType(), pk, spectypes.BNRoleAttester) + id := spectypes.NewMsgID(networkconfig.TestNetwork.DomainType(), pk, spectypes.RoleCommittee) signature, err := base64.StdEncoding.DecodeString("sVV0fsvqQlqliKv/ussGIatxpe8LDWhc9uoaM5WpjbiYvvxUr1eCpz0ja7UT1PGNDdmoGi6xbMC1g/ozhAt4uCdpy0Xdfqbv2hMf2iRL5ZPKOSmMifHbd8yg4PeeceyN") if err != nil { return nil, err } - signedMessage := specqbft.SignedMessage{ + signedMessage := genesisspecqbft.SignedMessage{ Signature: signature, Signers: []spectypes.OperatorID{1, 3, 4}, - Message: specqbft.Message{ - MsgType: specqbft.RoundChangeMsgType, - Height: specqbft.Height(height), + Message: genesisspecqbft.Message{ + MsgType: genesisspecqbft.RoundChangeMsgType, + Height: genesisspecqbft.Height(height), Round: 2, Identifier: id[:], Root: [32]byte{}, @@ -463,19 +473,6 @@ func (m *DummyMessageValidator) ValidatorForTopic(topic string) func(ctx context } } -func (m *DummyMessageValidator) ValidatePubsubMessage(ctx context.Context, p peer.ID, pmsg *pubsub.Message) pubsub.ValidationResult { +func (m *DummyMessageValidator) Validate(ctx context.Context, p peer.ID, pmsg *pubsub.Message) pubsub.ValidationResult { return pubsub.ValidationAccept } - -func (m *DummyMessageValidator) ValidateSSVMessage(msg *queue.DecodedSSVMessage) (*queue.DecodedSSVMessage, validation.Descriptor, error) { - var descriptor validation.Descriptor - - validatorPK := msg.SSVMessage.GetID().GetPubKey() - role := msg.SSVMessage.GetID().GetRoleType() - descriptor.Role = role - descriptor.ValidatorPK = validatorPK - - descriptor.SSVMessageType = msg.SSVMessage.GetType() - - return msg, descriptor, nil -} diff --git a/network/topics/msg_validator_test.go b/network/topics/msg_validator_test.go index b5e2c0679d..e80d4fa5ca 100644 --- a/network/topics/msg_validator_test.go +++ b/network/topics/msg_validator_test.go @@ -5,6 +5,8 @@ import ( "crypto" "crypto/rsa" "crypto/sha256" + genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types" + genesisvalidation "github.com/ssvlabs/ssv/message/validation/genesis" "testing" v1 "github.com/attestantio/go-eth2-client/api/v1" @@ -18,7 +20,6 @@ import ( "github.com/ssvlabs/ssv-spec/qbft" spectypes "github.com/ssvlabs/ssv-spec/types" spectestingutils "github.com/ssvlabs/ssv-spec/types/testingutils" - "github.com/ssvlabs/ssv/message/validation" "github.com/ssvlabs/ssv/network/commons" "github.com/ssvlabs/ssv/networkconfig" operatorstorage "github.com/ssvlabs/ssv/operator/storage" @@ -40,7 +41,7 @@ func TestMsgValidator(t *testing.T) { ks := spectestingutils.Testing4SharesSet() share := &ssvtypes.SSVShare{ - Share: *spectestingutils.TestingShare(ks), + Share: *spectestingutils.TestingShare(ks, 1), Metadata: ssvtypes.Metadata{ BeaconMetadata: &beaconprotocol.ValidatorMetadata{ Status: v1.ValidatorStateActiveOngoing, @@ -50,7 +51,7 @@ func TestMsgValidator(t *testing.T) { } require.NoError(t, ns.Shares().Save(nil, share)) - mv := validation.NewMessageValidator(networkconfig.TestNetwork, validation.WithNodeStorage(ns)) + mv := genesisvalidation.New(networkconfig.TestNetwork, genesisvalidation.WithNodeStorage(ns)) require.NotNil(t, mv) slot := networkconfig.TestNetwork.Beacon.GetBeaconNetwork().EstimatedCurrentSlot() @@ -86,11 +87,11 @@ func TestMsgValidator(t *testing.T) { signature, err := rsa.SignPKCS1v15(nil, operatorPrivateKey, crypto.SHA256, hash[:]) require.NoError(t, err) - sig := [256]byte{} + var sig []byte copy(sig[:], signature) - packedPubSubMsgPayload := spectypes.EncodeSignedSSVMessage(encodedMsg, operatorId, sig) - topicID := commons.ValidatorTopicID(ssvMsg.GetID().GetPubKey()) + packedPubSubMsgPayload := genesisspectypes.EncodeSignedSSVMessage(encodedMsg, operatorId, sig) + topicID := commons.ValidatorTopicID(ssvMsg.GetID().GetDutyExecutorID()) pmsg := &pubsub.Message{ Message: &pspb.Message{ @@ -98,8 +99,7 @@ func TestMsgValidator(t *testing.T) { Data: packedPubSubMsgPayload, }, } - - res := mv.ValidatePubsubMessage(context.Background(), "16Uiu2HAkyWQyCb6reWXGQeBUt9EXArk6h3aq3PsFMwLNq3pPGH1r", pmsg) + res := mv.Validate(context.Background(), "16Uiu2HAkyWQyCb6reWXGQeBUt9EXArk6h3aq3PsFMwLNq3pPGH1r", pmsg) require.Equal(t, pubsub.ValidationAccept, res) }) @@ -120,7 +120,7 @@ func TestMsgValidator(t *testing.T) { t.Run("empty message", func(t *testing.T) { pmsg := newPBMsg([]byte{}, "xxx", []byte{}) - res := mv.ValidatePubsubMessage(context.Background(), "xxxx", pmsg) + res := mv.Validate(context.Background(), "xxxx", pmsg) require.Equal(t, pubsub.ValidationReject, res) }) @@ -147,9 +147,9 @@ func newPBMsg(data []byte, topic string, from []byte) *pubsub.Message { } func dummySSVConsensusMsg(pk spectypes.ValidatorPK, height qbft.Height) (*spectypes.SSVMessage, error) { - id := spectypes.NewMsgID(networkconfig.TestNetwork.DomainType(), pk, spectypes.BNRoleAttester) + id := spectypes.NewMsgID(networkconfig.TestNetwork.DomainType(), pk[:], spectypes.RoleCommittee) ks := spectestingutils.Testing4SharesSet() - validSignedMessage := spectestingutils.TestingRoundChangeMessageWithHeightAndIdentifier(ks.Shares[1], 1, height, id[:]) + validSignedMessage := spectestingutils.TestingRoundChangeMessageWithHeightAndIdentifier(ks.OperatorKeys[1], 1, height, id[:]) encodedSignedMessage, err := validSignedMessage.Encode() if err != nil { diff --git a/network/topics/params/message_rate.go b/network/topics/params/message_rate.go index b5e01bd480..a64f4a07d8 100644 --- a/network/topics/params/message_rate.go +++ b/network/topics/params/message_rate.go @@ -110,7 +110,7 @@ func expectedSingleSCCommitteeDutiesPerEpochCached(numValidators int) float64 { // Calculates the message rate for a topic given its committees' configurations (number of operators and number of validators) func calculateMessageRateForTopic(committees []*storage.Committee) float64 { - if committees == nil || len(committees) == 0 { + if len(committees) == 0 { return 0 } diff --git a/network/topics/params/topic_score.go b/network/topics/params/topic_score.go index bc3f92e1bb..7eebc8281c 100644 --- a/network/topics/params/topic_score.go +++ b/network/topics/params/topic_score.go @@ -182,6 +182,23 @@ func NewSubnetTopicOpts(activeValidators, subnets int, committees []*storage.Com return opts } +// NewSubnetTopicOpts creates new TopicOpts for a subnet topic +func NewSubnetTopicOptsValidators(activeValidators, subnets int) *Options { + // Create options with default values + opts := NewOpts(activeValidators, subnets) + opts.defaults() + + // Set topic weight with equal weights + opts.Topic.TopicWeight = opts.Network.TotalTopicsWeight / float64(opts.Network.Subnets) + + // Set expected message rate based on stage metrics + validatorsPerSubnet := float64(opts.Network.ActiveValidators) / float64(opts.Network.Subnets) + msgsPerValidatorPerSecond := 600.0 / 10000.0 + opts.Topic.ExpectedMsgRate = validatorsPerSubnet * msgsPerValidatorPerSecond + + return opts +} + // TopicParams creates pubsub.TopicScoreParams from the given TopicOpts // implementation is based on ETH2.0, with alignments to ssv: // https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c diff --git a/network/topics/pubsub.go b/network/topics/pubsub.go index 58904f6562..b0826e70f5 100644 --- a/network/topics/pubsub.go +++ b/network/topics/pubsub.go @@ -5,6 +5,8 @@ import ( "net" "time" + "github.com/ssvlabs/ssv/networkconfig" + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/discovery" "github.com/libp2p/go-libp2p/core/host" @@ -44,6 +46,8 @@ const ( // PubSubConfig is the needed config to instantiate pubsub type PubSubConfig struct { + NetworkConfig networkconfig.NetworkConfig + Host host.Host TraceLog bool StaticPeers []peer.AddrInfo @@ -169,7 +173,13 @@ func NewPubSub(ctx context.Context, logger *zap.Logger, cfg *PubSubConfig, metri return 100, 100, 10, nil } } - topicScoreFactory = topicScoreParams(logger, cfg, committeesProvider) + + topicScoreFactory = func(t string) *pubsub.TopicScoreParams { + if cfg.NetworkConfig.PastAlanFork() { + return topicScoreParams(logger, cfg, committeesProvider)(t) + } + return validatorTopicScoreParams(logger, cfg)(t) + } } if cfg.MsgIDHandler != nil { diff --git a/network/topics/scoring.go b/network/topics/scoring.go index 129185edd8..8463ee5912 100644 --- a/network/topics/scoring.go +++ b/network/topics/scoring.go @@ -135,6 +135,27 @@ func topicScoreParams(logger *zap.Logger, cfg *PubSubConfig, committeesProvider } } +// topicScoreParams factory for creating scoring params for topics +func validatorTopicScoreParams(logger *zap.Logger, cfg *PubSubConfig) func(string) *pubsub.TopicScoreParams { + return func(t string) *pubsub.TopicScoreParams { + totalValidators, activeValidators, myValidators, err := cfg.GetValidatorStats() + if err != nil { + logger.Debug("could not read stats: active validators") + return nil + } + logger := logger.With(zap.String("topic", t), zap.Uint64("totalValidators", totalValidators), + zap.Uint64("activeValidators", activeValidators), zap.Uint64("myValidators", myValidators)) + logger.Debug("got validator stats for score params") + opts := params.NewSubnetTopicOptsValidators(int(totalValidators), commons.Subnets()) + tp, err := params.TopicParams(opts) + if err != nil { + logger.Debug("ignoring topic score params", zap.Error(err)) + return nil + } + return tp + } +} + // Returns a new committee list with only the committees that belong to the given topic func filterCommitteesForTopic(topic string, committees []*storage.Committee) []*storage.Committee { diff --git a/operator/validator/controller.go b/operator/validator/controller.go index f295307df6..21d683339b 100644 --- a/operator/validator/controller.go +++ b/operator/validator/controller.go @@ -12,7 +12,6 @@ import ( "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/ethereum/go-ethereum/common" "github.com/jellydator/ttlcache/v3" - "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "go.uber.org/zap" "golang.org/x/exp/maps" @@ -144,7 +143,6 @@ type SharesStorage interface { type P2PNetwork interface { protocolp2p.Broadcaster UseMessageRouter(router network.MessageRouter) - Peers(pk spectypes.ValidatorPK) ([]peer.ID, error) SubscribeRandoms(logger *zap.Logger, numSubnets int) error RegisterHandlers(logger *zap.Logger, handlers ...*p2pprotocol.SyncHandler) } @@ -188,7 +186,6 @@ type controller struct { committeesObserversMutex sync.Mutex recentlyStartedValidators uint64 - recentlyStartedCommittees uint64 metadataLastUpdated map[spectypes.ValidatorPK]time.Time indicesChange chan struct{} validatorExitCh chan duties.ExitDescriptor @@ -699,7 +696,7 @@ func (c *controller) GetValidator(pubKey spectypes.ValidatorPK) (*validator.Vali } func (c *controller) ExecuteGenesisDuty(logger *zap.Logger, duty *genesisspectypes.Duty) { - panic("implement me") + } func (c *controller) ExecuteDuty(logger *zap.Logger, duty *spectypes.BeaconDuty) { @@ -887,11 +884,6 @@ func (c *controller) onShareStop(pubKey spectypes.ValidatorPK) { } } -// todo wrapper to start both validator and committee -type starter interface { - Start() error -} - func (c *controller) onShareInit(share *ssvtypes.SSVShare) (*validator.Validator, *validator.Committee, error) { if !share.HasBeaconMetadata() { // fetching index and status in case not exist c.logger.Warn("skipping validator until it becomes active", fields.PubKey(share.ValidatorPubKey[:])) @@ -931,7 +923,8 @@ func (c *controller) onShareInit(share *ssvtypes.SSVShare) (*validator.Validator if !found { // Share context with both the validator and the runners, // so that when the validator is stopped, the runners are stopped as well. - ctx, _ := context.WithCancel(c.context) + ctx, cancel := context.WithCancel(c.context) + _ = cancel opts := c.validatorOptions opts.SSVShare = share diff --git a/protocol/v2/p2p/network.go b/protocol/v2/p2p/network.go index 3823f28faa..9db650b4d3 100644 --- a/protocol/v2/p2p/network.go +++ b/protocol/v2/p2p/network.go @@ -5,7 +5,6 @@ import ( "fmt" "strings" - "github.com/libp2p/go-libp2p/core/peer" "github.com/ssvlabs/ssv-spec/p2p" specqbft "github.com/ssvlabs/ssv-spec/qbft" spectypes "github.com/ssvlabs/ssv-spec/types" @@ -25,7 +24,6 @@ type Subscriber interface { // Unsubscribe unsubscribes from the validator subnet Unsubscribe(logger *zap.Logger, pk spectypes.ValidatorPK) error // Peers returns the peers that are connected to the given validator - Peers(pk spectypes.ValidatorPK) ([]peer.ID, error) } // Broadcaster enables to broadcast messages diff --git a/protocol/v2/qbft/instance/instance.go b/protocol/v2/qbft/instance/instance.go index 268ece9730..0606cea113 100644 --- a/protocol/v2/qbft/instance/instance.go +++ b/protocol/v2/qbft/instance/instance.go @@ -4,7 +4,6 @@ import ( "encoding/base64" "encoding/json" "sync" - "time" "github.com/pkg/errors" spectypes "github.com/ssvlabs/ssv-spec/types" @@ -27,7 +26,6 @@ type Instance struct { forceStop bool StartValue []byte - started time.Time metrics *metrics } @@ -37,7 +35,7 @@ func NewInstance( identifier []byte, height specqbft.Height, ) *Instance { - var name = "" + var name string if len(identifier) == 56 { name = spectypes.MessageID(identifier).GetRoleType().String() } else { @@ -62,15 +60,6 @@ func NewInstance( } } -// TODO remove -func messageIDFromBytes(mid []byte) spectypes.MessageID { - if len(mid) < 56 { - return spectypes.MessageID{} - } - - return spectypes.MessageID(mid) -} - func (i *Instance) ForceStop() { i.forceStop = true } diff --git a/protocol/v2/qbft/instance/round_change.go b/protocol/v2/qbft/instance/round_change.go index c69677a957..1cd1d38073 100644 --- a/protocol/v2/qbft/instance/round_change.go +++ b/protocol/v2/qbft/instance/round_change.go @@ -118,7 +118,9 @@ func (i *Instance) uponChangeRoundPartialQuorum(logger *zap.Logger, newRound spe } root, err := specqbft.HashDataRoot(instanceStartValue) - + if err != nil { + return errors.Wrap(err, "failed to hash instance start value") + } logger.Debug("📢 got partial quorum, broadcasting round change message", fields.Round(i.State.Round), fields.Root(root), diff --git a/protocol/v2/qbft/spectest/controller_type.go b/protocol/v2/qbft/spectest/controller_type.go index 94a956146e..9d9bf733e3 100644 --- a/protocol/v2/qbft/spectest/controller_type.go +++ b/protocol/v2/qbft/spectest/controller_type.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "github.com/ssvlabs/ssv/exporter/convert" "os" "path/filepath" "reflect" @@ -58,7 +59,7 @@ func RunControllerSpecTest(t *testing.T, test *spectests.ControllerSpecTest) { func generateController(logger *zap.Logger) *controller.Controller { identifier := []byte{1, 2, 3, 4} - config := qbfttesting.TestingConfig(logger, spectestingutils.Testing4SharesSet(), spectypes.RoleCommittee) + config := qbfttesting.TestingConfig(logger, spectestingutils.Testing4SharesSet(), convert.RoleCommittee) return qbfttesting.NewTestingQBFTController( identifier[:], spectestingutils.TestingCommitteeMember(spectestingutils.Testing4SharesSet()), diff --git a/protocol/v2/qbft/spectest/msg_processing_type.go b/protocol/v2/qbft/spectest/msg_processing_type.go index eef4748d15..673212088e 100644 --- a/protocol/v2/qbft/spectest/msg_processing_type.go +++ b/protocol/v2/qbft/spectest/msg_processing_type.go @@ -3,6 +3,7 @@ package qbft import ( "encoding/hex" "fmt" + "github.com/ssvlabs/ssv/exporter/convert" "path/filepath" "reflect" "testing" @@ -31,7 +32,7 @@ func RunMsgProcessing(t *testing.T, test *spectests.MsgProcessingSpecTest) { msgId := specqbft.ControllerIdToMessageID(test.Pre.State.ID) logger := logging.TestLogger(t) pre := instance.NewInstance( - qbfttesting.TestingConfig(logger, spectestingutils.KeySetForCommitteeMember(test.Pre.State.CommitteeMember), msgId.GetRoleType()), + qbfttesting.TestingConfig(logger, spectestingutils.KeySetForCommitteeMember(test.Pre.State.CommitteeMember), convert.RunnerRole(msgId.GetRoleType())), test.Pre.State.CommitteeMember, test.Pre.State.ID, test.Pre.State.Height, diff --git a/protocol/v2/qbft/spectest/qbft_mapping_test.go b/protocol/v2/qbft/spectest/qbft_mapping_test.go index 486b3a208e..bac7f00258 100644 --- a/protocol/v2/qbft/spectest/qbft_mapping_test.go +++ b/protocol/v2/qbft/spectest/qbft_mapping_test.go @@ -2,6 +2,7 @@ package qbft import ( "encoding/json" + "github.com/ssvlabs/ssv/exporter/convert" "os" "reflect" "strings" @@ -9,7 +10,6 @@ import ( spectests "github.com/ssvlabs/ssv-spec/qbft/spectest/tests" "github.com/ssvlabs/ssv-spec/qbft/spectest/tests/timeout" - spectypes "github.com/ssvlabs/ssv-spec/types" "github.com/ssvlabs/ssv-spec/types/testingutils" "github.com/stretchr/testify/require" @@ -100,7 +100,7 @@ func TestQBFTMapping(t *testing.T) { preByts, _ := typedTest.Pre.Encode() logger := logging.TestLogger(t) pre := instance.NewInstance( - testing2.TestingConfig(logger, testingutils.KeySetForCommitteeMember(typedTest.Pre.State.CommitteeMember), spectypes.RoleCommittee), + testing2.TestingConfig(logger, testingutils.KeySetForCommitteeMember(typedTest.Pre.State.CommitteeMember), convert.RoleCommittee), typedTest.Pre.State.CommitteeMember, typedTest.Pre.State.ID, typedTest.Pre.State.Height, @@ -108,8 +108,11 @@ func TestQBFTMapping(t *testing.T) { err = pre.Decode(preByts) require.NoError(t, err) typedTest.Pre = pre + t.Run(typedTest.Name, func(t *testing.T) { // using only spec struct so no need to run our version (TODO: check how we choose leader) + t.Parallel() + RunTimeout(t, typedTest) + }) - RunTimeout(t, typedTest) default: t.Fatalf("unsupported test type %s [%s]", testType, testName) } diff --git a/protocol/v2/qbft/testing/utils.go b/protocol/v2/qbft/testing/utils.go index f9dcaa016e..de0adcd2ba 100644 --- a/protocol/v2/qbft/testing/utils.go +++ b/protocol/v2/qbft/testing/utils.go @@ -15,7 +15,7 @@ import ( "github.com/ssvlabs/ssv/protocol/v2/qbft/controller" ) -var TestingConfig = func(logger *zap.Logger, keySet *testingutils.TestKeySet, role types.RunnerRole) *qbft.Config { +var TestingConfig = func(logger *zap.Logger, keySet *testingutils.TestKeySet, role convert.RunnerRole) *qbft.Config { return &qbft.Config{ BeaconSigner: testingutils.NewTestingKeyManager(), OperatorSigner: testingutils.NewTestingOperatorSigner(keySet, 1), @@ -35,7 +35,7 @@ var TestingConfig = func(logger *zap.Logger, keySet *testingutils.TestKeySet, ro ProposerF: func(state *specqbft.State, round specqbft.Round) types.OperatorID { return 1 }, - Storage: TestingStores(logger).Get(convert.RunnerRole(role)), + Storage: TestingStores(logger).Get(role), Network: testingutils.NewTestingNetwork(1, keySet.OperatorKeys[1]), Timer: roundtimer.NewTestingTimer(), SignatureVerification: true, diff --git a/protocol/v2/ssv/runner/committee.go b/protocol/v2/ssv/runner/committee.go index 528d2bd5a6..7b981d4b45 100644 --- a/protocol/v2/ssv/runner/committee.go +++ b/protocol/v2/ssv/runner/committee.go @@ -39,7 +39,6 @@ type CommitteeRunner struct { beacon beacon.BeaconNode signer types.BeaconSigner operatorSigner types.OperatorSigner - domain spectypes.DomainType valCheck specqbft.ProposedValueCheckF stoppedValidators map[spectypes.ValidatorPK]struct{} @@ -419,44 +418,50 @@ func (cr *CommitteeRunner) ProcessPostConsensus(logger *zap.Logger, signedMsg *t } } } + logger = logger.With(durationFields...) // Submit multiple attestations - attestations := make([]*phase0.Attestation, 0) + attestations := make([]*phase0.Attestation, 0, len(attestationsToSubmit)) for _, att := range attestationsToSubmit { attestations = append(attestations, att) } - submmitionStart := time.Now() - if err := cr.beacon.SubmitAttestations(attestations); err != nil { - logger.Error("❌ failed to submit attestation", zap.Error(err)) - return errors.Wrap(err, "could not submit to Beacon chain reconstructed attestation") - } - logger.Info("✅ successfully submitted attestations", - fields.SubmissionTime(time.Since(submmitionStart)), - fields.Height(cr.BaseRunner.QBFTController.Height), - fields.Round(cr.BaseRunner.State.RunningInstance.State.Round)) - // Record successful submissions - for validator := range attestationsToSubmit { - cr.RecordSubmission(types.BNRoleAttester, validator) + if len(attestations) > 0 { + submissionStart := time.Now() + if err := cr.beacon.SubmitAttestations(attestations); err != nil { + logger.Error("❌ failed to submit attestation", zap.Error(err)) + return errors.Wrap(err, "could not submit to Beacon chain reconstructed attestation") + } + + logger.Info("✅ successfully submitted attestations", + fields.SubmissionTime(time.Since(submissionStart)), + fields.Height(cr.BaseRunner.QBFTController.Height), + fields.Round(cr.BaseRunner.State.RunningInstance.State.Round)) + // Record successful submissions + for validator := range attestationsToSubmit { + cr.RecordSubmission(types.BNRoleAttester, validator) + } } // Submit multiple sync committee - syncCommitteeMessages := make([]*altair.SyncCommitteeMessage, 0) + syncCommitteeMessages := make([]*altair.SyncCommitteeMessage, 0, len(syncCommitteeMessagesToSubmit)) for _, syncMsg := range syncCommitteeMessagesToSubmit { syncCommitteeMessages = append(syncCommitteeMessages, syncMsg) } - submmitionStart = time.Now() - if err := cr.beacon.SubmitSyncMessages(syncCommitteeMessages); err != nil { - logger.Error("❌ failed to submit sync committee", zap.Error(err)) - return errors.Wrap(err, "could not submit to Beacon chain reconstructed signed sync committee") - } - logger.Info("✅ successfully submitted sync committee", - fields.SubmissionTime(time.Since(submmitionStart)), - fields.Height(cr.BaseRunner.QBFTController.Height), - fields.Round(cr.BaseRunner.State.RunningInstance.State.Round)) - // Record successful submissions - for validator := range syncCommitteeMessagesToSubmit { - cr.RecordSubmission(types.BNRoleSyncCommittee, validator) + if len(syncCommitteeMessages) > 0 { + submissionStart := time.Now() + if err := cr.beacon.SubmitSyncMessages(syncCommitteeMessages); err != nil { + logger.Error("❌ failed to submit sync committee", zap.Error(err)) + return errors.Wrap(err, "could not submit to Beacon chain reconstructed signed sync committee") + } + logger.Info("✅ successfully submitted sync committee", + fields.SubmissionTime(time.Since(submissionStart)), + fields.Height(cr.BaseRunner.QBFTController.Height), + fields.Round(cr.BaseRunner.State.RunningInstance.State.Round)) + // Record successful submissions + for validator := range syncCommitteeMessagesToSubmit { + cr.RecordSubmission(types.BNRoleSyncCommittee, validator) + } } if anyErr != nil { @@ -567,8 +572,11 @@ func (cr *CommitteeRunner) expectedPostConsensusRootsAndBeaconObjects() ( return nil, nil, nil, errors.Wrap(err, "could not decode beacon vote") } for _, beaconDuty := range duty.(*types.CommitteeDuty).BeaconDuties { + if beaconDuty == nil { + continue + } _, stopped := cr.stoppedValidators[spectypes.ValidatorPK(beaconDuty.PubKey)] - if beaconDuty == nil || stopped { + if stopped { continue } slot := beaconDuty.DutySlot() diff --git a/protocol/v2/ssv/runner/sync_committee_aggregator.go b/protocol/v2/ssv/runner/sync_committee_aggregator.go index 1a02e4f162..0d3774570c 100644 --- a/protocol/v2/ssv/runner/sync_committee_aggregator.go +++ b/protocol/v2/ssv/runner/sync_committee_aggregator.go @@ -228,6 +228,9 @@ func (r *SyncCommitteeAggregatorRunner) ProcessPostConsensus(logger *zap.Logger, r.metrics.EndPostConsensus() consensusData, err := spectypes.CreateConsensusData(r.GetState().DecidedValue) + if err != nil { + return errors.Wrap(err, "failed to decode decided value") + } // get contributions contributions, err := consensusData.GetSyncCommitteeContributions() diff --git a/protocol/v2/ssv/spectest/debug_states.go b/protocol/v2/ssv/spectest/debug_states.go index cc639b4d76..efd80cfa7b 100644 --- a/protocol/v2/ssv/spectest/debug_states.go +++ b/protocol/v2/ssv/spectest/debug_states.go @@ -49,6 +49,6 @@ func dumpState(t *testing.T, func logJSON(t *testing.T, name string, value interface{}) { bytes, err := json.Marshal(value) require.NoError(t, err) - err = os.WriteFile(fmt.Sprintf("%s/%s_test_serialized.json", dumpDir, name), bytes, 0644) + err = os.WriteFile(fmt.Sprintf("%s/%s_test_serialized.json", dumpDir, name), bytes, 0600) require.NoError(t, err) } diff --git a/protocol/v2/ssv/spectest/msg_processing_type.go b/protocol/v2/ssv/spectest/msg_processing_type.go index 288c52d4c3..0cd0c01292 100644 --- a/protocol/v2/ssv/spectest/msg_processing_type.go +++ b/protocol/v2/ssv/spectest/msg_processing_type.go @@ -134,7 +134,7 @@ func (test *MsgProcessingSpecTest) RunAsPartOfMultiTest(t *testing.T, logger *za } network := &spectestingutils.TestingNetwork{} - beaconNetwork := tests.NewTestingBeaconNodeWrapped() + var beaconNetwork *tests.TestingBeaconNodeWrapped var committee []*spectypes.Operator switch test.Runner.(type) { @@ -150,14 +150,14 @@ func (test *MsgProcessingSpecTest) RunAsPartOfMultiTest(t *testing.T, logger *za default: network = v.Network.(*spectestingutils.TestingNetwork) committee = v.Operator.Committee - beaconNetwork = test.Runner.GetBeaconNode() + beaconNetwork = test.Runner.GetBeaconNode().(*tests.TestingBeaconNodeWrapped) } // test output message spectestingutils.ComparePartialSignatureOutputMessages(t, test.OutputMessages, network.BroadcastedMsgs, committee) // test beacon broadcasted msgs - spectestingutils.CompareBroadcastedBeaconMsgs(t, test.BeaconBroadcastedRoots, beaconNetwork.(*tests.TestingBeaconNodeWrapped).GetBroadcastedRoots()) + spectestingutils.CompareBroadcastedBeaconMsgs(t, test.BeaconBroadcastedRoots, beaconNetwork.GetBroadcastedRoots()) // post root postRoot, err := test.Runner.GetRoot() @@ -169,20 +169,20 @@ func (test *MsgProcessingSpecTest) RunAsPartOfMultiTest(t *testing.T, logger *za } } -func (test *MsgProcessingSpecTest) compareBroadcastedBeaconMsgs(t *testing.T) { - broadcastedRoots := test.Runner.GetBeaconNode().(*tests.TestingBeaconNodeWrapped).GetBroadcastedRoots() - require.Len(t, broadcastedRoots, len(test.BeaconBroadcastedRoots)) - for _, r1 := range test.BeaconBroadcastedRoots { - found := false - for _, r2 := range broadcastedRoots { - if r1 == hex.EncodeToString(r2[:]) { - found = true - break - } - } - require.Truef(t, found, "broadcasted beacon root not found") - } -} +//func (test *MsgProcessingSpecTest) compareBroadcastedBeaconMsgs(t *testing.T) { +// broadcastedRoots := test.Runner.GetBeaconNode().(*tests.TestingBeaconNodeWrapped).GetBroadcastedRoots() +// require.Len(t, broadcastedRoots, len(test.BeaconBroadcastedRoots)) +// for _, r1 := range test.BeaconBroadcastedRoots { +// found := false +// for _, r2 := range broadcastedRoots { +// if r1 == hex.EncodeToString(r2[:]) { +// found = true +// break +// } +// } +// require.Truef(t, found, "broadcasted beacon root not found") +// } +//} func (test *MsgProcessingSpecTest) overrideStateComparison(t *testing.T) { testType := reflect.TypeOf(test).String() @@ -240,7 +240,7 @@ var baseCommitteeWithRunnerSample = func( shareMap[valIdx] = spectestingutils.TestingShare(ks, valIdx) } - createRunnerF := func(_ phase0.Slot, shareMap map[phase0.ValidatorIndex]*spectypes.Share, slashableValidators []spectypes.ShareValidatorPK) *runner.CommitteeRunner { + createRunnerF := func(_ phase0.Slot, shareMap map[phase0.ValidatorIndex]*spectypes.Share, _ []spectypes.ShareValidatorPK) *runner.CommitteeRunner { return runner.NewCommitteeRunner( networkconfig.TestNetwork, shareMap, diff --git a/protocol/v2/ssv/spectest/ssv_mapping_test.go b/protocol/v2/ssv/spectest/ssv_mapping_test.go index a46d06e419..48b3fe115e 100644 --- a/protocol/v2/ssv/spectest/ssv_mapping_test.go +++ b/protocol/v2/ssv/spectest/ssv_mapping_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" + "github.com/ssvlabs/ssv/exporter/convert" tests2 "github.com/ssvlabs/ssv/integration/qbft/tests" "github.com/ssvlabs/ssv/logging" "github.com/ssvlabs/ssv/protocol/v2/qbft/controller" @@ -78,11 +79,6 @@ func prepareTest(t *testing.T, logger *zap.Logger, name string, test interface{} switch testType { case reflect.TypeOf(&tests.MsgProcessingSpecTest{}).String(): - // TODO: fix blinded test - if strings.Contains(testName, "propose regular decide blinded") || strings.Contains(testName, "propose blinded decide regular") { - logger.Info("skipping blinded block test", zap.String("test", testName)) - return nil - } typedTest := msgProcessingSpecTestFromMap(t, test.(map[string]interface{})) return &runnable{ @@ -394,7 +390,7 @@ func fixRunnerForRun(t *testing.T, runnerMap map[string]interface{}, ks *spectes } func fixControllerForRun(t *testing.T, logger *zap.Logger, runner runner.Runner, contr *controller.Controller, ks *spectestingutils.TestKeySet) *controller.Controller { - config := qbfttesting.TestingConfig(logger, ks, spectypes.RoleCommittee) + config := qbfttesting.TestingConfig(logger, ks, convert.RoleCommittee) config.ValueCheckF = runner.GetValCheckF() newContr := controller.NewController( contr.Identifier, @@ -555,7 +551,7 @@ func fixCommitteeForRun(t *testing.T, ctx context.Context, logger *zap.Logger, c tests2.NewTestingBeaconNodeWrapped().GetBeaconNetwork(), &specCommittee.CommitteeMember, testingutils.NewTestingVerifier(), - func(slot phase0.Slot, shareMap map[phase0.ValidatorIndex]*spectypes.Share, slashableValidators []spectypes.ShareValidatorPK) *runner.CommitteeRunner { + func(slot phase0.Slot, shareMap map[phase0.ValidatorIndex]*spectypes.Share, _ []spectypes.ShareValidatorPK) *runner.CommitteeRunner { return ssvtesting.CommitteeRunnerWithShareMap(logger, shareMap).(*runner.CommitteeRunner) }, ) diff --git a/protocol/v2/ssv/testing/runner.go b/protocol/v2/ssv/testing/runner.go index 72a078f1d9..0742895bb7 100644 --- a/protocol/v2/ssv/testing/runner.go +++ b/protocol/v2/ssv/testing/runner.go @@ -2,6 +2,7 @@ package testing import ( "bytes" + "github.com/ssvlabs/ssv/exporter/convert" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/ssvlabs/ssv/integration/qbft/tests" @@ -77,13 +78,13 @@ var baseRunner = func( keySet *spectestingutils.TestKeySet, ) runner.Runner { share := spectestingutils.TestingShare(keySet, spectestingutils.TestingValidatorIndex) - identifier := spectypes.NewMsgID(TestingSSVDomainType, spectestingutils.TestingValidatorPubKey[:], spectypes.RunnerRole(role)) + identifier := spectypes.NewMsgID(TestingSSVDomainType, spectestingutils.TestingValidatorPubKey[:], role) net := spectestingutils.NewTestingNetwork(1, keySet.OperatorKeys[1]) km := spectestingutils.NewTestingKeyManager() operator := spectestingutils.TestingCommitteeMember(keySet) opSigner := spectestingutils.NewTestingOperatorSigner(keySet, 1) - config := testing.TestingConfig(logger, keySet, identifier.GetRoleType()) + config := testing.TestingConfig(logger, keySet, convert.RunnerRole(identifier.GetRoleType())) config.ValueCheckF = valCheck config.ProposerF = func(state *specqbft.State, round specqbft.Round) spectypes.OperatorID { return 1 @@ -114,7 +115,7 @@ var baseRunner = func( km, opSigner, valCheck, - ).(runner.Runner) + ) case spectypes.RoleAggregator: return runner.NewAggregatorRunner( networkconfig.TestNetwork, @@ -127,7 +128,7 @@ var baseRunner = func( opSigner, valCheck, TestingHighestDecidedSlot, - ).(runner.Runner) + ) case spectypes.RoleProposer: return runner.NewProposerRunner( networkconfig.TestNetwork, @@ -140,7 +141,7 @@ var baseRunner = func( opSigner, valCheck, TestingHighestDecidedSlot, - ).(runner.Runner) + ) case spectypes.RoleSyncCommitteeContribution: return runner.NewSyncCommitteeAggregatorRunner( networkconfig.TestNetwork, @@ -153,7 +154,7 @@ var baseRunner = func( opSigner, valCheck, TestingHighestDecidedSlot, - ).(runner.Runner) + ) case spectypes.RoleValidatorRegistration: return runner.NewValidatorRegistrationRunner( networkconfig.TestNetwork, @@ -164,7 +165,7 @@ var baseRunner = func( net, km, opSigner, - ).(runner.Runner) + ) case spectypes.RoleVoluntaryExit: return runner.NewVoluntaryExitRunner( networkconfig.TestNetwork, @@ -174,7 +175,7 @@ var baseRunner = func( net, km, opSigner, - ).(runner.Runner) + ) case spectestingutils.UnknownDutyType: ret := runner.NewCommitteeRunner( networkconfig.TestNetwork, @@ -187,7 +188,7 @@ var baseRunner = func( valCheck, ) ret.(*runner.CommitteeRunner).BaseRunner.RunnerRoleType = spectestingutils.UnknownDutyType - return ret.(runner.Runner) + return ret default: panic("unknown role type") } @@ -292,7 +293,7 @@ var baseRunnerWithShareMap = func( committeeMember := spectestingutils.TestingCommitteeMember(keySetInstance) opSigner := spectestingutils.NewTestingOperatorSigner(keySetInstance, committeeMember.OperatorID) - config := testing.TestingConfig(logger, keySetInstance, identifier.GetRoleType()) + config := testing.TestingConfig(logger, keySetInstance, convert.RunnerRole(identifier.GetRoleType())) config.ValueCheckF = valCheck config.ProposerF = func(state *specqbft.State, round specqbft.Round) spectypes.OperatorID { return 1 diff --git a/protocol/v2/ssv/validator/committee.go b/protocol/v2/ssv/validator/committee.go index fd0baa3f41..3b0a65f7eb 100644 --- a/protocol/v2/ssv/validator/committee.go +++ b/protocol/v2/ssv/validator/committee.go @@ -117,21 +117,20 @@ func (c *Committee) StartDuty(logger *zap.Logger, duty *spectypes.CommitteeDuty) } slashableValidators := make([]spectypes.ShareValidatorPK, 0, len(duty.BeaconDuties)) - - validatorShares := make(map[phase0.ValidatorIndex]*spectypes.Share, len(duty.BeaconDuties)) - toRemove := make([]int, 0) + //validatorShares := make(map[phase0.ValidatorIndex]*spectypes.Share, len(duty.BeaconDuties)) + //toRemove := make([]int, 0) // Remove beacon duties that don't have a share - for i, bd := range duty.BeaconDuties { - share, ok := c.Shares[bd.ValidatorIndex] - if !ok { - toRemove = append(toRemove, i) - continue - } - if bd.Type == spectypes.BNRoleAttester { - slashableValidators = append(slashableValidators, share.SharePubKey) - } - validatorShares[bd.ValidatorIndex] = share - } + //for i, bd := range duty.BeaconDuties { + // share, ok := c.Shares[bd.ValidatorIndex] + // if !ok { + // toRemove = append(toRemove, i) + // continue + // } + // if bd.Type == spectypes.BNRoleAttester { + // slashableValidators = append(slashableValidators, share.SharePubKey) + // } + // validatorShares[bd.ValidatorIndex] = share + //} // TODO bring this back when https://github.com/ssvlabs/ssv-spec/pull/467 is merged and spec is aligned //// Remove beacon duties that don't have a share @@ -151,6 +150,15 @@ func (c *Committee) StartDuty(logger *zap.Logger, duty *spectypes.CommitteeDuty) // TODO REMOVE this after https://github.com/ssvlabs/ssv-spec/pull/467 is merged and we are aligned to the spec // and pas validatorShares instead of sharesCopy the runner // --> + for _, bd := range duty.BeaconDuties { + share, ok := c.Shares[bd.ValidatorIndex] + if !ok { + continue + } + if bd.Type == spectypes.BNRoleAttester { + slashableValidators = append(slashableValidators, share.SharePubKey) + } + } var sharesCopy = make(map[phase0.ValidatorIndex]*spectypes.Share, len(c.Shares)) for k, v := range c.Shares { sharesCopy[k] = v @@ -347,21 +355,6 @@ func (c *Committee) UnmarshalJSON(data []byte) error { return nil } -// updateAttestingSlotMap updates the highest attesting slot map from beacon duties -func (c *Committee) updateAttestingSlotMap(duty *spectypes.CommitteeDuty) { - for _, beaconDuty := range duty.BeaconDuties { - if beaconDuty.Type == spectypes.BNRoleAttester { - validatorPK := spectypes.ValidatorPK(beaconDuty.PubKey) - if _, ok := c.HighestAttestingSlotMap[validatorPK]; !ok { - c.HighestAttestingSlotMap[validatorPK] = beaconDuty.Slot - } - if c.HighestAttestingSlotMap[validatorPK] < beaconDuty.Slot { - c.HighestAttestingSlotMap[validatorPK] = beaconDuty.Slot - } - } - } -} - func (c *Committee) validateMessage(msg *spectypes.SSVMessage) error { if !(c.Operator.CommitteeID.MessageIDBelongs(msg.GetID())) { return errors.New("msg ID doesn't match committee ID") diff --git a/protocol/v2/ssv/validator/non_committee_validator.go b/protocol/v2/ssv/validator/non_committee_validator.go index f509cd2d7d..502041d5ca 100644 --- a/protocol/v2/ssv/validator/non_committee_validator.go +++ b/protocol/v2/ssv/validator/non_committee_validator.go @@ -83,7 +83,7 @@ func (ncv *CommitteeObserver) ProcessMessage(msg *queue.DecodedSSVMessage) error return fmt.Errorf("failed to get partial signature message from network message %w", err) } if partialSigMessages.Type != spectypes.PostConsensusPartialSig { - return fmt.Errorf("not processing message type %b", partialSigMessages.Type) + return fmt.Errorf("not processing message type %d", partialSigMessages.Type) } slot := partialSigMessages.Slot diff --git a/protocol/v2/testing/test_utils.go b/protocol/v2/testing/test_utils.go index 8780a1e7c9..a1cdf42c28 100644 --- a/protocol/v2/testing/test_utils.go +++ b/protocol/v2/testing/test_utils.go @@ -114,17 +114,27 @@ func GetSSVMappingSpecTestJSON(path string, module string) ([]byte, error) { gzPath := filepath.Join(p, "spectest", "generate", "tests.json.gz") untypedTests := map[string]interface{}{} - file, err := os.Open(gzPath) + file, err := os.Open(gzPath) // #nosec G304 if err != nil { return nil, errors.Wrap(err, "failed to open gzip file") } - defer file.Close() + defer func() { + if err := file.Close(); err != nil { + // Handle the error, log it, or handle it as appropriate + log.Printf("Failed to close file: %v", err) + } + }() gzipReader, err := gzip.NewReader(file) if err != nil { return nil, errors.Wrap(err, "failed to create gzip reader") } - defer gzipReader.Close() + defer func() { + if err := gzipReader.Close(); err != nil { + // Handle the error, log it, or handle it as appropriate + log.Printf("Failed to close reader: %v", err) + } + }() decompressedData, err := io.ReadAll(gzipReader) if err != nil { @@ -253,7 +263,7 @@ func ExtractTarGz(gzipStream io.Reader) { tarReader := tar.NewReader(uncompressedStream) - for true { + for { header, err := tarReader.Next() if err == io.EOF { @@ -266,7 +276,7 @@ func ExtractTarGz(gzipStream io.Reader) { switch header.Typeflag { case tar.TypeDir: - if err := os.Mkdir(header.Name, 0755); err != nil { + if err := os.Mkdir(header.Name, 0750); err != nil { log.Fatalf("ExtractTarGz: Mkdir() failed: %s", err.Error()) } case tar.TypeReg: @@ -274,27 +284,27 @@ func ExtractTarGz(gzipStream io.Reader) { if err != nil { log.Fatalf("ExtractTarGz: Create() failed: %s", err.Error()) } - if _, err := io.Copy(outFile, tarReader); err != nil { + // Set a maximum size limit for the decompressed data + maxSize := int64(50 * 1024 * 1024) // 50 MB, adjust as needed + + // Wrap the tarReader with a LimitedReader + limitedReader := &io.LimitedReader{R: tarReader, N: maxSize} + + // Perform the copy operation with the limited reader + if _, err := io.Copy(outFile, limitedReader); err != nil { log.Fatalf("ExtractTarGz: Copy() failed: %s", err.Error()) } - outFile.Close() + err = outFile.Close() + if err != nil { + log.Fatalf("faild to close file: %s", err.Error()) + } default: log.Fatalf( - "ExtractTarGz: uknown type: %b in %s", + "ExtractTarGz: uknown type: %d in %s", header.Typeflag, header.Name) } } } - -func unpackTestsJson(path string) error { - r, err := os.Open(fmt.Sprintf("%s.gz", path)) - if err != nil { - errors.Wrap(err, "could not open file") - } - ExtractTarGz(r) - - return nil -} diff --git a/registry/storage/validatorstore.go b/registry/storage/validatorstore.go index cf5cb65f00..e2fbae9a38 100644 --- a/registry/storage/validatorstore.go +++ b/registry/storage/validatorstore.go @@ -307,15 +307,12 @@ func (c *validatorStore) handleShareUpdated(share *types.SSVShare) { // Update byCommitteeID for _, committee := range c.byCommitteeID { + if committee.ID != share.CommitteeID() { + continue + } for i, validator := range committee.Validators { if validator.ValidatorPubKey == share.ValidatorPubKey { committee.Validators[i] = share - break - } - } - - for i, index := range committee.Indices { - if index == share.ValidatorIndex { committee.Indices[i] = share.ValidatorIndex break }