Skip to content

Commit

Permalink
network/records: change subnets type to [commons.SubnetsCount]byte
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Oct 22, 2024
1 parent 0bc6433 commit 6b09804
Show file tree
Hide file tree
Showing 20 changed files with 140 additions and 143 deletions.
5 changes: 4 additions & 1 deletion api/handlers/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ func (h *Node) peers(peers []peer.ID) []peerJSON {
resp[i] = peerJSON{
ID: id,
Connectedness: h.Network.Connectedness(id).String(),
Subnets: h.PeersIndex.GetPeerSubnets(id).String(),
}
subnets, ok := h.PeersIndex.GetPeerSubnets(id)
if ok {
resp[i].Subnets = subnets.String()
}

for _, addr := range h.Network.Peerstore().Addrs(id) {
Expand Down
8 changes: 3 additions & 5 deletions network/discovery/dv5_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/ssvlabs/ssv/network/records"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/network/records"
)

// limitNodeFilter returns true if the limit is exceeded
Expand Down Expand Up @@ -71,14 +72,11 @@ func (dvs *DiscV5Service) sharedSubnetsFilter(n int) func(node *enode.Node) bool
if n == 0 {
return true
}
if len(dvs.subnets) == 0 {
return true
}
nodeSubnets, err := records.GetSubnetsEntry(node.Record())
if err != nil {
return false
}
shared := records.SharedSubnets(dvs.subnets, nodeSubnets, n)
shared := dvs.subnets.SharedSubnets(nodeSubnets, n)
// logger.Debug("shared subnets", zap.Ints("shared", shared),
// zap.String("node", node.String()))

Expand Down
13 changes: 6 additions & 7 deletions network/discovery/dv5_service.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package discovery

import (
"bytes"
"context"
"fmt"
"net"
Expand Down Expand Up @@ -62,7 +61,7 @@ type DiscV5Service struct {
sharedConn *SharedUDPConn

networkConfig networkconfig.NetworkConfig
subnets []byte
subnets records.Subnets

publishLock chan struct{}
}
Expand Down Expand Up @@ -177,7 +176,7 @@ func (dvs *DiscV5Service) checkPeer(logger *zap.Logger, e PeerEvent) error {
if err != nil {
return fmt.Errorf("could not read subnets: %w", err)
}
if bytes.Equal(zeroSubnets, nodeSubnets) {
if zeroSubnets == nodeSubnets {
return errors.New("zero subnets")
}

Expand Down Expand Up @@ -321,11 +320,11 @@ func (dvs *DiscV5Service) RegisterSubnets(logger *zap.Logger, subnets ...uint64)
if len(subnets) == 0 {
return false, nil
}
updatedSubnets, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), commons.Subnets(), subnets, nil)
updatedSubnets, isUpdated, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), subnets, nil)
if err != nil {
return false, errors.Wrap(err, "could not update ENR")
}
if updatedSubnets != nil {
if isUpdated {
dvs.subnets = updatedSubnets
logger.Debug("updated subnets", fields.UpdatedENRLocalNode(dvs.dv5Listener.LocalNode()))
return true, nil
Expand All @@ -340,11 +339,11 @@ func (dvs *DiscV5Service) DeregisterSubnets(logger *zap.Logger, subnets ...uint6
if len(subnets) == 0 {
return false, nil
}
updatedSubnets, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), commons.Subnets(), nil, subnets)
updatedSubnets, isUpdated, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), nil, subnets)
if err != nil {
return false, errors.Wrap(err, "could not update ENR")
}
if updatedSubnets != nil {
if isUpdated {
dvs.subnets = updatedSubnets
logger.Debug("updated subnets", fields.UpdatedENRLocalNode(dvs.dv5Listener.LocalNode()))
return true, nil
Expand Down
18 changes: 10 additions & 8 deletions network/discovery/dv5_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@ func TestCheckPeer(t *testing.T) {
expectedError: nil,
},
{
name: "missing subnets",
domainType: &myDomainType,
subnets: nil,
expectedError: errors.New("could not read subnets"),
name: "missing subnets",
domainType: &myDomainType,
subnets: records.Subnets{},
missingSubnets: true,
expectedError: errors.New("could not read subnets"),
},
{
name: "inactive subnets",
Expand Down Expand Up @@ -138,7 +139,7 @@ func TestCheckPeer(t *testing.T) {
err := records.SetDomainTypeEntry(localNode, records.KeyNextDomainType, *test.nextDomainType)
require.NoError(t, err)
}
if test.subnets != nil {
if !test.missingSubnets {
err := records.SetSubnetsEntry(localNode, test.subnets)
require.NoError(t, err)
}
Expand Down Expand Up @@ -176,13 +177,14 @@ type checkPeerTest struct {
name string
domainType *spectypes.DomainType
nextDomainType *spectypes.DomainType
subnets []byte
subnets records.Subnets
missingSubnets bool
localNode *enode.LocalNode
expectedError error
}

func mockSubnets(active ...int) []byte {
subnets := make([]byte, commons.Subnets())
func mockSubnets(active ...int) records.Subnets {
subnets := records.Subnets{}
for _, subnet := range active {
subnets[subnet] = 1
}
Expand Down
3 changes: 2 additions & 1 deletion network/discovery/node_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/ethereum/go-ethereum/p2p/enode"
spectypes "github.com/ssvlabs/ssv-spec/types"

"github.com/ssvlabs/ssv/network/records"
)

Expand All @@ -16,7 +17,7 @@ func DecorateWithDomainType(key records.ENRKey, domainType spectypes.DomainType)
}
}

func DecorateWithSubnets(subnets []byte) NodeRecordDecoration {
func DecorateWithSubnets(subnets records.Subnets) NodeRecordDecoration {
return func(node *enode.LocalNode) error {
return records.SetSubnetsEntry(node, subnets)
}
Expand Down
5 changes: 3 additions & 2 deletions network/discovery/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/ssvlabs/ssv/logging"
compatible_logger "github.com/ssvlabs/ssv/network/discovery/logger"
"github.com/ssvlabs/ssv/network/records"

"github.com/ssvlabs/ssv/network/commons"

Expand Down Expand Up @@ -34,8 +35,8 @@ type DiscV5Options struct {
NetworkKey *ecdsa.PrivateKey
// Bootnodes is a list of bootstrapper nodes
Bootnodes []string
// Subnets is a bool slice represents all the subnets the node is intreseted in
Subnets []byte
// Subnets is a bool slice encoded in hex, it represents all the subnets the node is interested in
Subnets records.Subnets
// EnableLogging when true enables logs to be emitted
EnableLogging bool
}
Expand Down
3 changes: 2 additions & 1 deletion network/discovery/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.uber.org/zap"

spectypes "github.com/ssvlabs/ssv-spec/types"

"github.com/ssvlabs/ssv/network/records"
"github.com/ssvlabs/ssv/networkconfig"
)
Expand Down Expand Up @@ -281,7 +282,7 @@ func TestDiscV5Service_checkPeer(t *testing.T) {
dvs.conns.(*MockConnection).SetAtLimit(false)

// Valid peer but no common subnet
subnets := make([]byte, len(records.ZeroSubnets))
subnets := records.Subnets{}
subnets[10] = 1
err = dvs.checkPeer(testLogger, ToPeerEvent(NodeWithCustomSubnets(t, subnets)))
require.ErrorContains(t, err, "no shared subnets")
Expand Down
15 changes: 8 additions & 7 deletions network/discovery/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/network/commons"
"github.com/ssvlabs/ssv/network/peers"
"github.com/ssvlabs/ssv/network/records"
"github.com/ssvlabs/ssv/networkconfig"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -58,8 +60,7 @@ func testingDiscoveryOptions(t *testing.T, networkConfig networkconfig.NetworkCo
}

// Discovery options
allSubs, _ := records.Subnets{}.FromString(records.AllSubnets)
subnetsIndex := peers.NewSubnetsIndex(len(allSubs))
subnetsIndex := peers.NewSubnetsIndex(commons.Subnets())
connectionIndex := NewMockConnection()

return &Options{
Expand Down Expand Up @@ -160,7 +161,7 @@ func NodeWithoutNextDomain(t *testing.T) *enode.Node {
}

func NodeWithoutSubnets(t *testing.T) *enode.Node {
return CustomNode(t, true, testNetConfig.DomainType(), true, testNetConfig.NextDomainType(), false, nil)
return CustomNode(t, true, testNetConfig.DomainType(), true, testNetConfig.NextDomainType(), false, records.Subnets{})
}

func NodeWithCustomDomains(t *testing.T, domainType spectypes.DomainType, nextDomainType spectypes.DomainType) *enode.Node {
Expand All @@ -171,14 +172,14 @@ func NodeWithZeroSubnets(t *testing.T) *enode.Node {
return CustomNode(t, true, testNetConfig.DomainType(), true, testNetConfig.NextDomainType(), true, zeroSubnets)
}

func NodeWithCustomSubnets(t *testing.T, subnets []byte) *enode.Node {
func NodeWithCustomSubnets(t *testing.T, subnets records.Subnets) *enode.Node {
return CustomNode(t, true, testNetConfig.DomainType(), true, testNetConfig.NextDomainType(), true, subnets)
}

func CustomNode(t *testing.T,
setDomainType bool, domainType spectypes.DomainType,
setNextDomainType bool, nextDomainType spectypes.DomainType,
setSubnets bool, subnets []byte) *enode.Node {
setSubnets bool, subnets records.Subnets) *enode.Node {

// Generate key
nodeKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
Expand Down
20 changes: 8 additions & 12 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ type p2pNetwork struct {

backoffConnector *libp2pdiscbackoff.BackoffConnector

fixedSubnets []byte
activeSubnets []byte
fixedSubnets records.Subnets
activeSubnets records.Subnets

libConnManager connmgrcore.ConnManager

Expand Down Expand Up @@ -298,16 +298,14 @@ func (n *p2pNetwork) peersBalancing(logger *zap.Logger) func() {
ctx, cancel := context.WithTimeout(n.ctx, connManagerBalancingTimeout)
defer cancel()

mySubnets := records.Subnets(n.activeSubnets).Clone()

// Disconnect from irrelevant peers
disconnectedPeers := connMgr.DisconnectFromIrrelevantPeers(logger, maximumIrrelevantPeersToDisconnect, n.host.Network(), allPeers, mySubnets)
disconnectedPeers := connMgr.DisconnectFromIrrelevantPeers(logger, maximumIrrelevantPeersToDisconnect, n.host.Network(), allPeers, n.activeSubnets)
if disconnectedPeers > 0 {
return
}

// Trim peers according to subnet participation (considering the subnet size)
connMgr.TagBestPeers(logger, n.cfg.MaxPeers-1, mySubnets, allPeers, n.cfg.TopicMaxPeers)
connMgr.TagBestPeers(logger, n.cfg.MaxPeers-1, n.activeSubnets, allPeers, n.cfg.TopicMaxPeers)
connMgr.TrimPeers(ctx, logger, n.host.Network())
}
}
Expand Down Expand Up @@ -350,16 +348,14 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) {
// there is a pending PR to replace this: https://github.com/ssvlabs/ssv/pull/990
logger = logger.Named(logging.NameP2PNetwork)
ticker := time.NewTicker(time.Second)
registeredSubnets := make([]byte, commons.Subnets())
registeredSubnets := records.Subnets{}
defer ticker.Stop()

// Run immediately and then every second.
for ; true; <-ticker.C {
start := time.Now()

// Compute the new subnets according to the active committees/validators.
updatedSubnets := make([]byte, commons.Subnets())
copy(updatedSubnets, n.fixedSubnets)
updatedSubnets := n.fixedSubnets

n.activeCommittees.Range(func(cid string, status validatorStatus) bool {
subnet := commons.CommitteeSubnet(spectypes.CommitteeID([]byte(cid)))
Expand Down Expand Up @@ -399,7 +395,7 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) {
}

n.idx.UpdateSelfRecord(func(self *records.NodeInfo) *records.NodeInfo {
self.Metadata.Subnets = records.Subnets(n.activeSubnets).String()
self.Metadata.Subnets = n.activeSubnets.String()
return self
})

Expand Down Expand Up @@ -437,7 +433,7 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) {
}

allSubs, _ := records.Subnets{}.FromString(records.AllSubnets)
subnetsList := records.SharedSubnets(allSubs, n.activeSubnets, 0)
subnetsList := allSubs.SharedSubnets(n.activeSubnets, 0)
logger.Debug("updated subnets",
zap.Any("added", addedSubnets),
zap.Any("removed", removedSubnets),
Expand Down
6 changes: 1 addition & 5 deletions network/p2p/p2p_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,9 @@ func (n *p2pNetwork) SubscribeRandoms(logger *zap.Logger, numSubnets int) error
}
}

// Update the subnets slice.
subnets := make([]byte, commons.Subnets())
copy(subnets, n.fixedSubnets)
for _, subnet := range randomSubnets {
subnets[subnet] = byte(1)
n.fixedSubnets[subnet] = byte(1)
}
n.fixedSubnets = subnets

return nil
}
Expand Down
12 changes: 4 additions & 8 deletions network/p2p/p2p_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,12 @@ func (n *p2pNetwork) initCfg() error {
n.cfg.UserAgent = userAgent(n.cfg.UserAgent)
}
if len(n.cfg.Subnets) > 0 {
s := make(records.Subnets, 0)
s := records.Subnets{}
subnets, err := s.FromString(strings.Replace(n.cfg.Subnets, "0x", "", 1))
if err != nil {
return fmt.Errorf("parse subnet: %w", err)
}
n.fixedSubnets = subnets
} else {
n.fixedSubnets = make(records.Subnets, p2pcommons.Subnets())
}
if n.cfg.MaxPeers <= 0 {
n.cfg.MaxPeers = minPeersBuffer
Expand Down Expand Up @@ -181,7 +179,7 @@ func (n *p2pNetwork) setupPeerServices(logger *zap.Logger) error {
self := records.NewNodeInfo(domain)
self.Metadata = &records.NodeMetadata{
NodeVersion: commons.GetNodeVersion(),
Subnets: records.Subnets(n.fixedSubnets).String(),
Subnets: n.fixedSubnets.String(),
}
getPrivKey := func() crypto.PrivKey {
return libPrivKey
Expand Down Expand Up @@ -253,10 +251,8 @@ func (n *p2pNetwork) setupDiscovery(logger *zap.Logger) error {
Bootnodes: n.cfg.TransformBootnodes(),
EnableLogging: n.cfg.DiscoveryTrace,
}
if len(n.fixedSubnets) > 0 {
discV5Opts.Subnets = n.fixedSubnets
logger = logger.With(zap.String("subnets", records.Subnets(n.fixedSubnets).String()))
}
discV5Opts.Subnets = n.fixedSubnets
logger = logger.With(zap.String("subnets", n.fixedSubnets.String()))
logger.Info("discovery: using discv5",
zap.Strings("bootnodes", discV5Opts.Bootnodes),
zap.String("ip", discV5Opts.IP))
Expand Down
13 changes: 8 additions & 5 deletions network/peers/conn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ func (c connManager) getBestPeers(n int, mySubnets records.Subnets, allPeers []p
// Compute the score for each peer according to peer's subnets and subnets' score
var peerLogs []peerLog
for _, pid := range allPeers {
peerSubnets := c.subnetsIdx.GetPeerSubnets(pid)
peerSubnets, ok := c.subnetsIdx.GetPeerSubnets(pid)
var score PeerScore
if len(peerSubnets) == 0 {
if !ok {
// TODO: shouldn't we not connect to peers with no subnets?
c.logger.Debug("peer has no subnets", zap.String("peer", pid.String()))
score = -1000
Expand All @@ -129,7 +129,7 @@ func (c connManager) getBestPeers(n int, mySubnets records.Subnets, allPeers []p
peerLogs = append(peerLogs, peerLog{
Peer: pid,
Score: score,
SharedSubnets: len(records.SharedSubnets(peerSubnets, mySubnets, len(mySubnets))),
SharedSubnets: len(peerSubnets.SharedSubnets(mySubnets, len(mySubnets))),
})
}

Expand Down Expand Up @@ -231,8 +231,11 @@ func (c connManager) DisconnectFromBadPeers(logger *zap.Logger, net libp2pnetwor
func (c connManager) DisconnectFromIrrelevantPeers(logger *zap.Logger, disconnectQuota int, net libp2pnetwork.Network, allPeers []peer.ID, mySubnets records.Subnets) int {
disconnectedPeers := 0
for _, peerID := range allPeers {
peerSubnets := c.subnetsIdx.GetPeerSubnets(peerID)
sharedSubnets := records.SharedSubnets(mySubnets, peerSubnets, len(mySubnets))
var sharedSubnets []int
peerSubnets, ok := c.subnetsIdx.GetPeerSubnets(peerID)
if ok {
sharedSubnets = mySubnets.SharedSubnets(peerSubnets, len(mySubnets))
}

// If there's no common subnet, disconnect from peer.
if len(sharedSubnets) == 0 {
Expand Down
Loading

0 comments on commit 6b09804

Please sign in to comment.