From 454a5c5907c79ab08694dcad096426d51abc2132 Mon Sep 17 00:00:00 2001 From: MatheusFranco99 <48058141+MatheusFranco99@users.noreply.github.com> Date: Thu, 25 Jul 2024 13:38:29 +0100 Subject: [PATCH 1/4] Improve p2p scoring log (#1500) * Improve p2p scoring logs with counters and weights for p1, p2, p4, p6 and p7 * Improve comments for scoreInspector function --- network/topics/pubsub.go | 33 +++++----- network/topics/scoring.go | 125 ++++++++++++++++++++++++++++++-------- 2 files changed, 119 insertions(+), 39 deletions(-) diff --git a/network/topics/pubsub.go b/network/topics/pubsub.go index b0826e70f5..20143b0e2a 100644 --- a/network/topics/pubsub.go +++ b/network/topics/pubsub.go @@ -153,20 +153,7 @@ func NewPubSub(ctx context.Context, logger *zap.Logger, cfg *PubSubConfig, metri if cfg.ScoreIndex != nil || inspector != nil { cfg.initScoring() - if inspector == nil { - peerConnected := func(pid peer.ID) bool { - return cfg.Host.Network().Connectedness(pid) == libp2pnetwork.Connected - } - inspector = scoreInspector(logger, cfg.ScoreIndex, scoreInspectLogFrequency, metrics, peerConnected) - } - - if inspectInterval == 0 { - inspectInterval = defaultScoreInspectInterval - } - - peerScoreParams := params.PeerScoreParams(cfg.Scoring.OneEpochDuration, cfg.MsgIDCacheTTL, cfg.DisableIPRateLimit, cfg.Scoring.IPWhilelist...) - psOpts = append(psOpts, pubsub.WithPeerScore(peerScoreParams, params.PeerScoreThresholds()), - pubsub.WithPeerScoreInspect(inspector, inspectInterval)) + // Get topic score params factory if cfg.GetValidatorStats == nil { cfg.GetValidatorStats = func() (uint64, uint64, uint64, error) { // default in case it was not injected @@ -180,6 +167,24 @@ func NewPubSub(ctx context.Context, logger *zap.Logger, cfg *PubSubConfig, metri } return validatorTopicScoreParams(logger, cfg)(t) } + + // Get overall score params + peerScoreParams := params.PeerScoreParams(cfg.Scoring.OneEpochDuration, cfg.MsgIDCacheTTL, cfg.DisableIPRateLimit, cfg.Scoring.IPWhilelist...) + + // Define score inspector + if inspector == nil { + peerConnected := func(pid peer.ID) bool { + return cfg.Host.Network().Connectedness(pid) == libp2pnetwork.Connected + } + inspector = scoreInspector(logger, cfg.ScoreIndex, scoreInspectLogFrequency, metrics, peerConnected, peerScoreParams, topicScoreFactory) + } + if inspectInterval == 0 { + inspectInterval = defaultScoreInspectInterval + } + + // Append score params to pubsub options + psOpts = append(psOpts, pubsub.WithPeerScore(peerScoreParams, params.PeerScoreThresholds()), + pubsub.WithPeerScoreInspect(inspector, inspectInterval)) } if cfg.MsgIDHandler != nil { diff --git a/network/topics/scoring.go b/network/topics/scoring.go index 8463ee5912..c25954b442 100644 --- a/network/topics/scoring.go +++ b/network/topics/scoring.go @@ -1,7 +1,6 @@ package topics import ( - "math" "time" "github.com/ssvlabs/ssv/logging/fields" @@ -26,52 +25,128 @@ func DefaultScoringConfig() *ScoringConfig { // scoreInspector inspects scores and updates the score index accordingly // TODO: finalize once validation is in place -func scoreInspector(logger *zap.Logger, scoreIdx peers.ScoreIndex, logFrequency int, metrics Metrics, peerConnected func(pid peer.ID) bool) pubsub.ExtendedPeerScoreInspectFn { +func scoreInspector(logger *zap.Logger, scoreIdx peers.ScoreIndex, logFrequency int, metrics Metrics, peerConnected func(pid peer.ID) bool, peerScoreParams *pubsub.PeerScoreParams, topicScoreParamsFactory func(string) *pubsub.TopicScoreParams) pubsub.ExtendedPeerScoreInspectFn { inspections := 0 return func(scores map[peer.ID]*pubsub.PeerScoreSnapshot) { + + if inspections%logFrequency != 0 { + // Don't log yet. + inspections++ + return + } + // Reset metrics before updating them. metrics.ResetPeerScores() + // Use a "scope cache" for getting a topic's score parameters + // otherwise, the factory method would be called multiple times for the same topic + topicScoreParamsCache := make(map[string]*pubsub.TopicScoreParams) + getScoreParamsForTopic := func(topic string) *pubsub.TopicScoreParams { + if topicParams, exists := topicScoreParamsCache[topic]; exists { + return topicParams + } + topicScoreParamsCache[topic] = topicScoreParamsFactory(topic) + return topicScoreParamsCache[topic] + } + + // Log score for each peer for pid, peerScores := range scores { - // Compute score-related stats for this peer. + + // Store topic snapshot for topics with invalid messages filtered := make(map[string]*pubsub.TopicScoreSnapshot) - var totalInvalidMessages float64 - var totalLowMeshDeliveries int - var p4ScoreSquaresSum float64 for topic, snapshot := range peerScores.Topics { - p4ScoreSquaresSum += snapshot.InvalidMessageDeliveries * snapshot.InvalidMessageDeliveries - if snapshot.InvalidMessageDeliveries != 0 { filtered[topic] = snapshot } - if snapshot.InvalidMessageDeliveries > 0 { - totalInvalidMessages += math.Sqrt(snapshot.InvalidMessageDeliveries) - } - if snapshot.MeshMessageDeliveries < 107 { - totalLowMeshDeliveries++ + } + + // Compute p4 impact in final score for metrics + p4Impact := float64(0) + + // Compute counters and weights for p1, p2, p4, p6 and p7. + // The subscores p3 and p5 are unused. + + // P1 - Time in mesh + // The weight should be equal for all topics. So, we can just sum up the counters. + p1CounterSum := float64(0) + + // P2 - First message deliveries + // The weight for the p2 score (w2) may be different through topics + // So, we store a list of counters P2c and their associated weights W2 + // Note: if necessary, we may reduce the size of the log by summing P2c * W2 for all topics and logging this result + type P2Score struct { + P2Counter float64 + W2 float64 + } + p2 := make([]*P2Score, 0) + + // P4 - InvalidMessageDeliveries + // The weight should be equal for all topics. So, we can just sum up the counters squared. + p4CounterSum := float64(0) + + // Get counters for each topic + for topic, snapshot := range peerScores.Topics { + + topicScoreParams := getScoreParamsForTopic(topic) + + // Cap p1 as done in GossipSub + p1Counter := float64(snapshot.TimeInMesh / topicScoreParams.TimeInMeshQuantum) + if p1Counter > topicScoreParams.TimeInMeshCap { + p1Counter = topicScoreParams.TimeInMeshCap } + p1CounterSum += p1Counter + + // Square the P4 counter as done in GossipSub + p4CounterSquaredForTopic := snapshot.InvalidMessageDeliveries * snapshot.InvalidMessageDeliveries + p4CounterSum += p4CounterSquaredForTopic + + // Update p4 impact on final score + p4Impact += topicScoreParams.TopicWeight * topicScoreParams.InvalidMessageDeliveriesWeight * p4CounterSquaredForTopic + + // Store the counter and weight for P2 + w2 := topicScoreParams.FirstMessageDeliveriesWeight + p2 = append(p2, &P2Score{ + P2Counter: snapshot.FirstMessageDeliveries, + W2: w2, + }) } + // Get weights for P1 and P4 (w1 and w4), which should be equal for all topics + w1 := float64(0) + w4 := float64(0) + for topic := range peerScores.Topics { + topicScoreParams := getScoreParamsForTopic(topic) + w1 = topicScoreParams.TimeInMeshWeight + w4 = topicScoreParams.InvalidMessageDeliveriesWeight + break + } + + // P6 - IP Colocation factor + p6 := peerScores.IPColocationFactor + w6 := peerScoreParams.IPColocationFactorWeight + + // P7 - Behaviour penalty + p7 := peerScores.BehaviourPenalty + w7 := peerScoreParams.BehaviourPenaltyWeight + // Update metrics. metrics.PeerScore(pid, peerScores.Score) - metrics.PeerP4Score(pid, p4ScoreSquaresSum) - - if inspections%logFrequency != 0 { - // Don't log yet. - continue - } + metrics.PeerP4Score(pid, p4Impact) // Log. fields := []zap.Field{ fields.PeerID(pid), fields.PeerScore(peerScores.Score), - zap.Any("invalid_messages", filtered), - zap.Float64("ip_colocation", peerScores.IPColocationFactor), - zap.Float64("behaviour_penalty", peerScores.BehaviourPenalty), - zap.Float64("app_specific_penalty", peerScores.AppSpecificScore), - zap.Float64("total_low_mesh_deliveries", float64(totalLowMeshDeliveries)), - zap.Float64("total_invalid_messages", totalInvalidMessages), + zap.Float64("p1_time_in_mesh", p1CounterSum), + zap.Float64("w1_time_in_mesh", w1), + zap.Any("p2_first_message_deliveries", p2), + zap.Float64("p4_invalid_message_deliveries", p4CounterSum), + zap.Float64("w4_invalid_message_deliveries", w4), + zap.Float64("p6_ip_colocation_factor", p6), + zap.Float64("w6_ip_colocation_factor", w6), + zap.Float64("p7_behaviour_penalty", p7), + zap.Float64("w7_behaviour_penalty", w7), zap.Any("invalid_messages", filtered), } if peerConnected(pid) { From 6e4f5eb4ff2e6f70ecf9ea13a59f085970f68e78 Mon Sep 17 00:00:00 2001 From: Anton Korpusenko Date: Thu, 25 Jul 2024 18:09:08 +0300 Subject: [PATCH 2/4] (alan/no-fork) Committee runner: Data race fix (#1487) * fixed data race * removed locking locking all consumers * splitted startduty and consume queue flows to prevent spec tests data race * linting issues fix --- network/p2p/p2p_validation_test.go | 7 ++++-- protocol/v2/ssv/validator/committee.go | 27 ++++++++++++++-------- protocol/v2/ssv/validator/duty_executer.go | 4 ++++ 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/network/p2p/p2p_validation_test.go b/network/p2p/p2p_validation_test.go index a632261ced..af3ffdc902 100644 --- a/network/p2p/p2p_validation_test.go +++ b/network/p2p/p2p_validation_test.go @@ -5,8 +5,6 @@ import ( cryptorand "crypto/rand" "encoding/hex" "fmt" - "github.com/cornelk/hashmap" - "github.com/libp2p/go-libp2p/core/peer" "math/rand" "os" "sort" @@ -15,6 +13,9 @@ import ( "testing" "time" + "github.com/cornelk/hashmap" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/aquasecurity/table" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/sourcegraph/conc/pool" @@ -265,6 +266,7 @@ func CreateVirtualNet( node := vn.NodeByPeerID(selfPeer) if node == nil { t.Fatalf("self peer not found (%s)", selfPeer) + return } node.PeerScores.Range(func(index NodeIndex, snapshot *pubsub.PeerScoreSnapshot) bool { @@ -275,6 +277,7 @@ func CreateVirtualNet( peerNode := vn.NodeByPeerID(peerID) if peerNode == nil { t.Fatalf("peer not found (%s)", peerID) + return } node.PeerScores.Set(peerNode.Index, peerScore) } diff --git a/protocol/v2/ssv/validator/committee.go b/protocol/v2/ssv/validator/committee.go index f3b8e3cd9b..ca63b17b44 100644 --- a/protocol/v2/ssv/validator/committee.go +++ b/protocol/v2/ssv/validator/committee.go @@ -87,6 +87,22 @@ func (c *Committee) RemoveShare(validatorIndex phase0.ValidatorIndex) { } } +func (c *Committee) StartConsumeQueue(logger *zap.Logger, duty *spectypes.CommitteeDuty) error { + c.mtx.Lock() + defer c.mtx.Unlock() + + r := c.Runners[duty.Slot] + if r == nil { + return errors.New(fmt.Sprintf("no runner found for slot %d", duty.Slot)) + } + + go func() { + err := c.ConsumeQueue(logger, duty.Slot, c.ProcessMessage, r) + logger.Error("failed consuming queue", zap.Error(err)) + }() + return nil +} + // StartDuty starts a new duty for the given slot func (c *Committee) StartDuty(logger *zap.Logger, duty *spectypes.CommitteeDuty) error { c.logger.Debug("Starting committee duty runner", zap.Uint64("slot", uint64(duty.Slot))) @@ -165,17 +181,10 @@ func (c *Committee) StartDuty(logger *zap.Logger, duty *spectypes.CommitteeDuty) } - logger = c.logger.With(fields.DutyID(fields.FormatCommitteeDutyID(c.Operator.Committee, c.BeaconNetwork.EstimatedEpochAtSlot(duty.Slot), duty.Slot)), fields.Slot(duty.Slot)) - // TODO alan: stop queue - go func() { - err := c.ConsumeQueue(logger, duty.Slot, c.ProcessMessage, c.Runners[duty.Slot]) - if err != nil { - logger.Warn("handles error message", zap.Error(err)) - } - }() + runnerLogger := c.logger.With(fields.DutyID(fields.FormatCommitteeDutyID(c.Operator.Committee, c.BeaconNetwork.EstimatedEpochAtSlot(duty.Slot), duty.Slot)), fields.Slot(duty.Slot)) logger.Info("ℹ️ starting duty processing") - return c.Runners[duty.Slot].StartNewDuty(logger, duty, c.Operator.GetQuorum()) + return c.Runners[duty.Slot].StartNewDuty(runnerLogger, duty, c.Operator.GetQuorum()) } // NOT threadsafe diff --git a/protocol/v2/ssv/validator/duty_executer.go b/protocol/v2/ssv/validator/duty_executer.go index bd4e2d7c18..ab1957d3d0 100644 --- a/protocol/v2/ssv/validator/duty_executer.go +++ b/protocol/v2/ssv/validator/duty_executer.go @@ -38,5 +38,9 @@ func (c *Committee) OnExecuteDuty(logger *zap.Logger, msg *types.EventMsg) error return fmt.Errorf("could not start committee duty: %w", err) } + if err := c.StartConsumeQueue(logger, executeDutyData.Duty); err != nil { + return fmt.Errorf("could not start committee consume queue: %w", err) + } + return nil } From 6954be71f39b7dd8549829acca0a54ca8cf5bd6d Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 25 Jul 2024 12:38:11 -0300 Subject: [PATCH 3/4] move prevDecided check before broadcastDecided --- protocol/v2/qbft/controller/controller.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/protocol/v2/qbft/controller/controller.go b/protocol/v2/qbft/controller/controller.go index d3e57568b2..cbe4cba22c 100644 --- a/protocol/v2/qbft/controller/controller.go +++ b/protocol/v2/qbft/controller/controller.go @@ -135,6 +135,10 @@ func (c *Controller) UponExistingInstanceMsg(logger *zap.Logger, msg *specqbft.P return nil, errors.Wrap(err, "could not process msg") } + if prevDecided { + return nil, err + } + // save the highest Decided if !decided { return nil, nil @@ -151,10 +155,6 @@ func (c *Controller) UponExistingInstanceMsg(logger *zap.Logger, msg *specqbft.P logger.Debug("❌ failed to broadcast decided message", zap.Error(err)) } - if prevDecided { - return nil, err - } - return decidedMsg, nil } From d378790b121e512ab07ed7b6077941ca0570f25f Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 25 Jul 2024 12:50:32 -0300 Subject: [PATCH 4/4] delete SignedMessageAPI --- exporter/api/msg.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/exporter/api/msg.go b/exporter/api/msg.go index 6f6084a001..729da28e3c 100644 --- a/exporter/api/msg.go +++ b/exporter/api/msg.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" specqbft "github.com/ssvlabs/ssv-spec/qbft" + qbftstorage "github.com/ssvlabs/ssv/protocol/v2/qbft/storage" ) @@ -22,14 +23,6 @@ type Message struct { Data interface{} `json:"data,omitempty"` } -type SignedMessageAPI struct { - Signature spectypes.Signature - Signers []spectypes.OperatorID - Message specqbft.Message - - FullData *spectypes.ValidatorConsensusData -} - type ParticipantsAPI struct { Signers []spectypes.OperatorID Slot phase0.Slot