Skip to content

Commit

Permalink
Merge branch 'refs/heads/alan/no-fork' into alan/no-fork-spec-align-0…
Browse files Browse the repository at this point in the history
…cbc825
  • Loading branch information
nkryuchkov committed Jul 25, 2024
2 parents d378790 + 6e4f5eb commit 2a75c9d
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 50 deletions.
7 changes: 5 additions & 2 deletions network/p2p/p2p_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
cryptorand "crypto/rand"
"encoding/hex"
"fmt"
"github.com/cornelk/hashmap"
"github.com/libp2p/go-libp2p/core/peer"
"math/rand"
"os"
"sort"
Expand All @@ -15,6 +13,9 @@ import (
"testing"
"time"

"github.com/cornelk/hashmap"
"github.com/libp2p/go-libp2p/core/peer"

"github.com/aquasecurity/table"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/sourcegraph/conc/pool"
Expand Down Expand Up @@ -265,6 +266,7 @@ func CreateVirtualNet(
node := vn.NodeByPeerID(selfPeer)
if node == nil {
t.Fatalf("self peer not found (%s)", selfPeer)
return
}

node.PeerScores.Range(func(index NodeIndex, snapshot *pubsub.PeerScoreSnapshot) bool {
Expand All @@ -275,6 +277,7 @@ func CreateVirtualNet(
peerNode := vn.NodeByPeerID(peerID)
if peerNode == nil {
t.Fatalf("peer not found (%s)", peerID)
return
}
node.PeerScores.Set(peerNode.Index, peerScore)
}
Expand Down
33 changes: 19 additions & 14 deletions network/topics/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,20 +153,7 @@ func NewPubSub(ctx context.Context, logger *zap.Logger, cfg *PubSubConfig, metri
if cfg.ScoreIndex != nil || inspector != nil {
cfg.initScoring()

if inspector == nil {
peerConnected := func(pid peer.ID) bool {
return cfg.Host.Network().Connectedness(pid) == libp2pnetwork.Connected
}
inspector = scoreInspector(logger, cfg.ScoreIndex, scoreInspectLogFrequency, metrics, peerConnected)
}

if inspectInterval == 0 {
inspectInterval = defaultScoreInspectInterval
}

peerScoreParams := params.PeerScoreParams(cfg.Scoring.OneEpochDuration, cfg.MsgIDCacheTTL, cfg.DisableIPRateLimit, cfg.Scoring.IPWhilelist...)
psOpts = append(psOpts, pubsub.WithPeerScore(peerScoreParams, params.PeerScoreThresholds()),
pubsub.WithPeerScoreInspect(inspector, inspectInterval))
// Get topic score params factory
if cfg.GetValidatorStats == nil {
cfg.GetValidatorStats = func() (uint64, uint64, uint64, error) {
// default in case it was not injected
Expand All @@ -180,6 +167,24 @@ func NewPubSub(ctx context.Context, logger *zap.Logger, cfg *PubSubConfig, metri
}
return validatorTopicScoreParams(logger, cfg)(t)
}

// Get overall score params
peerScoreParams := params.PeerScoreParams(cfg.Scoring.OneEpochDuration, cfg.MsgIDCacheTTL, cfg.DisableIPRateLimit, cfg.Scoring.IPWhilelist...)

// Define score inspector
if inspector == nil {
peerConnected := func(pid peer.ID) bool {
return cfg.Host.Network().Connectedness(pid) == libp2pnetwork.Connected
}
inspector = scoreInspector(logger, cfg.ScoreIndex, scoreInspectLogFrequency, metrics, peerConnected, peerScoreParams, topicScoreFactory)
}
if inspectInterval == 0 {
inspectInterval = defaultScoreInspectInterval
}

// Append score params to pubsub options
psOpts = append(psOpts, pubsub.WithPeerScore(peerScoreParams, params.PeerScoreThresholds()),
pubsub.WithPeerScoreInspect(inspector, inspectInterval))
}

if cfg.MsgIDHandler != nil {
Expand Down
125 changes: 100 additions & 25 deletions network/topics/scoring.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package topics

import (
"math"
"time"

"github.com/ssvlabs/ssv/logging/fields"
Expand All @@ -26,52 +25,128 @@ func DefaultScoringConfig() *ScoringConfig {

// scoreInspector inspects scores and updates the score index accordingly
// TODO: finalize once validation is in place
func scoreInspector(logger *zap.Logger, scoreIdx peers.ScoreIndex, logFrequency int, metrics Metrics, peerConnected func(pid peer.ID) bool) pubsub.ExtendedPeerScoreInspectFn {
func scoreInspector(logger *zap.Logger, scoreIdx peers.ScoreIndex, logFrequency int, metrics Metrics, peerConnected func(pid peer.ID) bool, peerScoreParams *pubsub.PeerScoreParams, topicScoreParamsFactory func(string) *pubsub.TopicScoreParams) pubsub.ExtendedPeerScoreInspectFn {
inspections := 0

return func(scores map[peer.ID]*pubsub.PeerScoreSnapshot) {

if inspections%logFrequency != 0 {
// Don't log yet.
inspections++
return
}

// Reset metrics before updating them.
metrics.ResetPeerScores()

// Use a "scope cache" for getting a topic's score parameters
// otherwise, the factory method would be called multiple times for the same topic
topicScoreParamsCache := make(map[string]*pubsub.TopicScoreParams)
getScoreParamsForTopic := func(topic string) *pubsub.TopicScoreParams {
if topicParams, exists := topicScoreParamsCache[topic]; exists {
return topicParams
}
topicScoreParamsCache[topic] = topicScoreParamsFactory(topic)
return topicScoreParamsCache[topic]
}

// Log score for each peer
for pid, peerScores := range scores {
// Compute score-related stats for this peer.

// Store topic snapshot for topics with invalid messages
filtered := make(map[string]*pubsub.TopicScoreSnapshot)
var totalInvalidMessages float64
var totalLowMeshDeliveries int
var p4ScoreSquaresSum float64
for topic, snapshot := range peerScores.Topics {
p4ScoreSquaresSum += snapshot.InvalidMessageDeliveries * snapshot.InvalidMessageDeliveries

if snapshot.InvalidMessageDeliveries != 0 {
filtered[topic] = snapshot
}
if snapshot.InvalidMessageDeliveries > 0 {
totalInvalidMessages += math.Sqrt(snapshot.InvalidMessageDeliveries)
}
if snapshot.MeshMessageDeliveries < 107 {
totalLowMeshDeliveries++
}

// Compute p4 impact in final score for metrics
p4Impact := float64(0)

// Compute counters and weights for p1, p2, p4, p6 and p7.
// The subscores p3 and p5 are unused.

// P1 - Time in mesh
// The weight should be equal for all topics. So, we can just sum up the counters.
p1CounterSum := float64(0)

// P2 - First message deliveries
// The weight for the p2 score (w2) may be different through topics
// So, we store a list of counters P2c and their associated weights W2
// Note: if necessary, we may reduce the size of the log by summing P2c * W2 for all topics and logging this result
type P2Score struct {
P2Counter float64
W2 float64
}
p2 := make([]*P2Score, 0)

// P4 - InvalidMessageDeliveries
// The weight should be equal for all topics. So, we can just sum up the counters squared.
p4CounterSum := float64(0)

// Get counters for each topic
for topic, snapshot := range peerScores.Topics {

topicScoreParams := getScoreParamsForTopic(topic)

// Cap p1 as done in GossipSub
p1Counter := float64(snapshot.TimeInMesh / topicScoreParams.TimeInMeshQuantum)
if p1Counter > topicScoreParams.TimeInMeshCap {
p1Counter = topicScoreParams.TimeInMeshCap
}
p1CounterSum += p1Counter

// Square the P4 counter as done in GossipSub
p4CounterSquaredForTopic := snapshot.InvalidMessageDeliveries * snapshot.InvalidMessageDeliveries
p4CounterSum += p4CounterSquaredForTopic

// Update p4 impact on final score
p4Impact += topicScoreParams.TopicWeight * topicScoreParams.InvalidMessageDeliveriesWeight * p4CounterSquaredForTopic

// Store the counter and weight for P2
w2 := topicScoreParams.FirstMessageDeliveriesWeight
p2 = append(p2, &P2Score{
P2Counter: snapshot.FirstMessageDeliveries,
W2: w2,
})
}

// Get weights for P1 and P4 (w1 and w4), which should be equal for all topics
w1 := float64(0)
w4 := float64(0)
for topic := range peerScores.Topics {
topicScoreParams := getScoreParamsForTopic(topic)
w1 = topicScoreParams.TimeInMeshWeight
w4 = topicScoreParams.InvalidMessageDeliveriesWeight
break
}

// P6 - IP Colocation factor
p6 := peerScores.IPColocationFactor
w6 := peerScoreParams.IPColocationFactorWeight

// P7 - Behaviour penalty
p7 := peerScores.BehaviourPenalty
w7 := peerScoreParams.BehaviourPenaltyWeight

// Update metrics.
metrics.PeerScore(pid, peerScores.Score)
metrics.PeerP4Score(pid, p4ScoreSquaresSum)

if inspections%logFrequency != 0 {
// Don't log yet.
continue
}
metrics.PeerP4Score(pid, p4Impact)

// Log.
fields := []zap.Field{
fields.PeerID(pid),
fields.PeerScore(peerScores.Score),
zap.Any("invalid_messages", filtered),
zap.Float64("ip_colocation", peerScores.IPColocationFactor),
zap.Float64("behaviour_penalty", peerScores.BehaviourPenalty),
zap.Float64("app_specific_penalty", peerScores.AppSpecificScore),
zap.Float64("total_low_mesh_deliveries", float64(totalLowMeshDeliveries)),
zap.Float64("total_invalid_messages", totalInvalidMessages),
zap.Float64("p1_time_in_mesh", p1CounterSum),
zap.Float64("w1_time_in_mesh", w1),
zap.Any("p2_first_message_deliveries", p2),
zap.Float64("p4_invalid_message_deliveries", p4CounterSum),
zap.Float64("w4_invalid_message_deliveries", w4),
zap.Float64("p6_ip_colocation_factor", p6),
zap.Float64("w6_ip_colocation_factor", w6),
zap.Float64("p7_behaviour_penalty", p7),
zap.Float64("w7_behaviour_penalty", w7),
zap.Any("invalid_messages", filtered),
}
if peerConnected(pid) {
Expand Down
27 changes: 18 additions & 9 deletions protocol/v2/ssv/validator/committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,22 @@ func (c *Committee) RemoveShare(validatorIndex phase0.ValidatorIndex) {
}
}

func (c *Committee) StartConsumeQueue(logger *zap.Logger, duty *spectypes.CommitteeDuty) error {
c.mtx.Lock()
defer c.mtx.Unlock()

r := c.Runners[duty.Slot]
if r == nil {
return errors.New(fmt.Sprintf("no runner found for slot %d", duty.Slot))
}

go func() {
err := c.ConsumeQueue(logger, duty.Slot, c.ProcessMessage, r)
logger.Error("failed consuming queue", zap.Error(err))
}()
return nil
}

// StartDuty starts a new duty for the given slot
func (c *Committee) StartDuty(logger *zap.Logger, duty *spectypes.CommitteeDuty) error {
c.logger.Debug("Starting committee duty runner", zap.Uint64("slot", uint64(duty.Slot)))
Expand Down Expand Up @@ -162,17 +178,10 @@ func (c *Committee) StartDuty(logger *zap.Logger, duty *spectypes.CommitteeDuty)

}

logger = c.logger.With(fields.DutyID(fields.FormatCommitteeDutyID(c.Operator.Committee, c.BeaconNetwork.EstimatedEpochAtSlot(duty.Slot), duty.Slot)), fields.Slot(duty.Slot))
// TODO alan: stop queue
go func() {
err := c.ConsumeQueue(logger, duty.Slot, c.ProcessMessage, c.Runners[duty.Slot])
if err != nil {
logger.Warn("handles error message", zap.Error(err))
}
}()
runnerLogger := c.logger.With(fields.DutyID(fields.FormatCommitteeDutyID(c.Operator.Committee, c.BeaconNetwork.EstimatedEpochAtSlot(duty.Slot), duty.Slot)), fields.Slot(duty.Slot))

logger.Info("ℹ️ starting duty processing")
return c.Runners[duty.Slot].StartNewDuty(logger, duty, c.Operator.GetQuorum())
return c.Runners[duty.Slot].StartNewDuty(runnerLogger, duty, c.Operator.GetQuorum())
}

// NOT threadsafe
Expand Down
4 changes: 4 additions & 0 deletions protocol/v2/ssv/validator/duty_executer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,9 @@ func (c *Committee) OnExecuteDuty(logger *zap.Logger, msg *types.EventMsg) error
return fmt.Errorf("could not start committee duty: %w", err)
}

if err := c.StartConsumeQueue(logger, executeDutyData.Duty); err != nil {
return fmt.Errorf("could not start committee consume queue: %w", err)
}

return nil
}

0 comments on commit 2a75c9d

Please sign in to comment.