Skip to content

Commit

Permalink
align committee runner consensus logs (#1475)
Browse files Browse the repository at this point in the history
* align committee runner consensus logs

* change convert.Role to spectype.RunnerRole in runners

* added back logs for debug

* change got partial signatures log to contain more details

---------

Co-authored-by: guy muroch <[email protected]>
  • Loading branch information
guym-blox and guy muroch authored Jul 16, 2024
1 parent e854033 commit ebe4cef
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 63 deletions.
2 changes: 1 addition & 1 deletion protocol/v2/ssv/runner/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewAggregatorRunner(
operatorSigner: operatorSigner,
valCheck: valCheck,

metrics: metrics.NewConsensusMetrics(spectypes.BNRoleAggregator),
metrics: metrics.NewConsensusMetrics(spectypes.RoleAggregator),
}
}

Expand Down
15 changes: 14 additions & 1 deletion protocol/v2/ssv/runner/attester.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
package runner

//
//import (
// "encoding/hex"
// "github.com/attestantio/go-eth2-client/spec/phase0"
// "github.com/prysmaticlabs/go-bitfield"
// specssv "github.com/ssvlabs/ssv-spec/ssv"
// spectypes "github.com/ssvlabs/ssv-spec/types"
// "github.com/ssvlabs/ssv/logging/fields"
// "github.com/ssvlabs/ssv/protocol/v2/qbft/controller"
// "github.com/ssvlabs/ssv/protocol/v2/ssv/runner/metrics"
// "go.uber.org/zap"
// "time"
//)
//
//type AttesterRunner struct {
// BaseRunner *BaseRunner
Expand Down Expand Up @@ -40,7 +53,7 @@ package runner
// operatorSigner: operatorSigner,
// valCheck: valCheck,
//
// metrics: metrics.NewConsensusMetrics(spectypes.BNRoleAttester),
// metrics: metrics.NewConsensusMetrics(spectypes.RoleAttester),
// }
//}
//
Expand Down
102 changes: 50 additions & 52 deletions protocol/v2/ssv/runner/committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"strconv"
"github.com/ssvlabs/ssv/protocol/v2/ssv/runner/metrics"
"time"

"github.com/attestantio/go-eth2-client/spec/altair"
Expand Down Expand Up @@ -33,21 +33,18 @@ import (
//}

type CommitteeRunner struct {
BaseRunner *BaseRunner
domain spectypes.DomainType
beacon beacon.BeaconNode
network specqbft.Network
signer types.BeaconSigner
operatorSigner types.OperatorSigner
valCheck specqbft.ProposedValueCheckF

BaseRunner *BaseRunner
network specqbft.Network
beacon beacon.BeaconNode
signer types.BeaconSigner
operatorSigner types.OperatorSigner
domain spectypes.DomainType
valCheck specqbft.ProposedValueCheckF
stoppedValidators map[spectypes.ValidatorPK]struct{}
submittedDuties map[types.BeaconRole]map[phase0.ValidatorIndex]struct{}

submittedDuties map[types.BeaconRole]map[phase0.ValidatorIndex]struct{}

started time.Time
consensusDone time.Time
postStarted time.Time
started time.Time
metrics metrics.ConsensusMetrics
}

func NewCommitteeRunner(
Expand Down Expand Up @@ -145,10 +142,6 @@ func (cr *CommitteeRunner) UnmarshalJSON(data []byte) error {
valCheck specqbft.ProposedValueCheckF
//
//stoppedValidators map[spectypes.ValidatorPK]struct{}
//
//started time.Time
//consensusDone time.Time
//postStarted time.Time
}

// Unmarshal the JSON data into the auxiliary struct
Expand All @@ -165,10 +158,6 @@ func (cr *CommitteeRunner) UnmarshalJSON(data []byte) error {
cr.operatorSigner = aux.operatorSigner
cr.valCheck = aux.valCheck
//cr.stoppedValidators = aux.stoppedValidators
//cr.started = aux.started
//cr.consensusDone = aux.consensusDone
//cr.postStarted = aux.postStarted

return nil
}

Expand Down Expand Up @@ -211,9 +200,8 @@ func (cr *CommitteeRunner) ProcessConsensus(logger *zap.Logger, msg *types.Signe
return nil
}

cr.consensusDone = time.Now()
cr.postStarted = time.Now()

cr.metrics.EndConsensus()
cr.metrics.StartPostConsensus()
// decided means consensus is done

duty := cr.BaseRunner.State.StartingDuty
Expand Down Expand Up @@ -245,15 +233,13 @@ func (cr *CommitteeRunner) ProcessConsensus(logger *zap.Logger, msg *types.Signe
if err != nil {
return errors.Wrap(err, "failed to hash attestation data")
}
logger.Debug("signed attestation data",
zap.Int("validator_index", int(duty.ValidatorIndex)),
logger.Debug("signed attestation data", zap.Int("validator_index", int(duty.ValidatorIndex)),
zap.String("pub_key", hex.EncodeToString(duty.PubKey[:])),
zap.Any("attestation_data", attestationData),
zap.String("attestation_data_root", hex.EncodeToString(adr[:])),
zap.String("signing_root", hex.EncodeToString(partialMsg.SigningRoot[:])),
zap.String("signature", hex.EncodeToString(partialMsg.PartialSignature[:])),
)

case types.BNRoleSyncCommittee:
blockRoot := beaconVote.BlockRoot
partialMsg, err := cr.BaseRunner.signBeaconObject(cr, duty, types.SSZBytes(blockRoot[:]), duty.DutySlot(),
Expand Down Expand Up @@ -284,12 +270,6 @@ func (cr *CommitteeRunner) ProcessConsensus(logger *zap.Logger, msg *types.Signe
return errors.Wrap(err, "could not create SignedSSVMessage from SSVMessage")
}

// TODO: (Alan) revert?
logger.Debug("📢 broadcasting post consensus message",
fields.Slot(duty.DutySlot()),
zap.Int("sigs", len(postConsensusMsg.Messages)),
)

if err := cr.GetNetwork().Broadcast(ssvMsg.MsgID, msgToBroadcast); err != nil {
return errors.Wrap(err, "can't broadcast partial post consensus sig")
}
Expand All @@ -303,39 +283,33 @@ func (cr *CommitteeRunner) ProcessPostConsensus(logger *zap.Logger, signedMsg *t
if err != nil {
return errors.Wrap(err, "failed processing post consensus message")
}
logger = logger.With(fields.Slot(signedMsg.Slot))

// TODO: (Alan) revert?
indices := make([]int, len(signedMsg.Messages))
signers := make([]uint64, len(signedMsg.Messages))
for i, msg := range signedMsg.Messages {
signers[i] = msg.Signer
indices[i] = int(msg.ValidatorIndex)
}

// Get unique roots to avoid repetition
rootSet := make(map[[32]byte]struct{})
for _, root := range roots {
rootSet[root] = struct{}{}
}

logger.Debug("got post consensus",
logger.Debug("🧩 got partial signatures",
zap.Bool("quorum", quorum),
fields.Slot(cr.BaseRunner.State.StartingDuty.DutySlot()),
zap.Int("signer", int(signedMsg.Messages[0].Signer)),
zap.Int("sigs", len(roots)),
zap.Ints("validators", indices),
)
zap.Ints("validators", indices))

if !quorum {
return nil
}

consensusDuration := cr.consensusDone.Sub(cr.started)
postConsensusDuration := time.Since(cr.postStarted)
totalDuration := consensusDuration + postConsensusDuration
cr.metrics.EndPostConsensus()

durationFields := []zap.Field{
fields.ConsensusTime(consensusDuration),
zap.String("post_consensus_time", strconv.FormatFloat(postConsensusDuration.Seconds(), 'f', 5, 64)),
zap.String("total_consensus_time", strconv.FormatFloat(totalDuration.Seconds(), 'f', 5, 64)),
fields.ConsensusTime(cr.metrics.GetConsensusTime()),
fields.PostConsensusTime(cr.metrics.GetPostConsensusTime()),
zap.Duration("total_consensus_time", time.Since(cr.started)),
}

// Get validator-root maps for attestations and sync committees, and the root-beacon object map
Expand All @@ -348,9 +322,13 @@ func (cr *CommitteeRunner) ProcessPostConsensus(logger *zap.Logger, signedMsg *t
attestationsToSubmit := make(map[phase0.ValidatorIndex]*phase0.Attestation)
syncCommitteeMessagesToSubmit := make(map[phase0.ValidatorIndex]*altair.SyncCommitteeMessage)

// Get unique roots to avoid repetition
rootSet := make(map[[32]byte]struct{})
for _, root := range roots {
rootSet[root] = struct{}{}
}
// For each root that got at least one quorum, find the duties associated to it and try to submit
for root := range rootSet {

// Get validators related to the given root
role, validators, found := findValidators(root, attestationMap, committeeMap)

Expand Down Expand Up @@ -408,7 +386,8 @@ func (cr *CommitteeRunner) ProcessPostConsensus(logger *zap.Logger, signedMsg *t
}
specSig := phase0.BLSSignature{}
copy(specSig[:], sig)

vlogger.Debug("🧩 reconstructed partial signatures",
zap.Uint64s("signers", getPostConsensusSigners(cr.BaseRunner.State, root)))
// Get the beacon object related to root
if _, exists := beaconObjects[validator]; !exists {
anyErr = errors.Wrap(err, "could not find beacon object for validator")
Expand Down Expand Up @@ -438,14 +417,22 @@ func (cr *CommitteeRunner) ProcessPostConsensus(logger *zap.Logger, signedMsg *t
}
}
}
logger = logger.With(durationFields...)
// Submit multiple attestations
attestations := make([]*phase0.Attestation, 0)
for _, att := range attestationsToSubmit {
attestations = append(attestations, att)
}
submmitionStart := time.Now()
if err := cr.beacon.SubmitAttestations(attestations); err != nil {
logger.Error("❌ failed to submit attestation", zap.Error(err))
return errors.Wrap(err, "could not submit to Beacon chain reconstructed attestation")
}

logger.Info("✅ successfully submitted attestations",
fields.SubmissionTime(time.Since(submmitionStart)),
fields.Height(cr.BaseRunner.QBFTController.Height),
fields.Round(cr.BaseRunner.State.RunningInstance.State.Round))
// Record successful submissions
for validator := range attestationsToSubmit {
cr.RecordSubmission(types.BNRoleAttester, validator)
Expand All @@ -456,9 +443,15 @@ func (cr *CommitteeRunner) ProcessPostConsensus(logger *zap.Logger, signedMsg *t
for _, syncMsg := range syncCommitteeMessagesToSubmit {
syncCommitteeMessages = append(syncCommitteeMessages, syncMsg)
}
submmitionStart = time.Now()
if err := cr.beacon.SubmitSyncMessages(syncCommitteeMessages); err != nil {
logger.Error("❌ failed to submit sync committee", zap.Error(err))
return errors.Wrap(err, "could not submit to Beacon chain reconstructed signed sync committee")
}
logger.Info("✅ successfully submitted sync committee",
fields.SubmissionTime(time.Since(submmitionStart)),
fields.Height(cr.BaseRunner.QBFTController.Height),
fields.Round(cr.BaseRunner.State.RunningInstance.State.Round))
// Record successful submissions
for validator := range syncCommitteeMessagesToSubmit {
cr.RecordSubmission(types.BNRoleSyncCommittee, validator)
Expand Down Expand Up @@ -638,12 +631,17 @@ func (cr *CommitteeRunner) expectedPostConsensusRootsAndBeaconObjects() (
}

func (cr *CommitteeRunner) executeDuty(logger *zap.Logger, duty types.Duty) error {
start := time.Now()
slot := duty.DutySlot()
//TODO committeeIndex is 0, is this correct?
attData, _, err := cr.GetBeaconNode().GetAttestationData(slot, 0)
if err != nil {
return errors.Wrap(err, "failed to get attestation data")
}
//TODO committeeIndex is 0, is this correct?
logger = logger.With(
zap.Duration("attestation_data_time", time.Since(start)),
fields.Slot(slot),
)

cr.started = time.Now()

Expand Down
4 changes: 2 additions & 2 deletions protocol/v2/ssv/runner/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
spectypes "github.com/ssvlabs/ssv-spec/types"
"log"
"time"

Expand All @@ -9,7 +10,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
specqbft "github.com/ssvlabs/ssv-spec/qbft"
spectypes "github.com/ssvlabs/ssv-spec/types"
)

var (
Expand Down Expand Up @@ -110,7 +110,7 @@ type ConsensusMetrics struct {
beaconDataDuration time.Duration
}

func NewConsensusMetrics(role spectypes.BeaconRole) ConsensusMetrics {
func NewConsensusMetrics(role spectypes.RunnerRole) ConsensusMetrics {
values := []string{role.String()}
return ConsensusMetrics{
preConsensus: metricsPreConsensusDuration.WithLabelValues(values...),
Expand Down
2 changes: 1 addition & 1 deletion protocol/v2/ssv/runner/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewProposerRunner(
valCheck: valCheck,
operatorSigner: operatorSigner,

metrics: metrics.NewConsensusMetrics(spectypes.BNRoleProposer),
metrics: metrics.NewConsensusMetrics(spectypes.RoleProposer),
}
}

Expand Down
2 changes: 1 addition & 1 deletion protocol/v2/ssv/runner/sync_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ package runner
// valCheck: valCheck,
// operatorSigner: operatorSigner,
//
// metrics: metrics.NewConsensusMetrics(spectypes.BNRoleSyncCommittee),
// metrics: metrics.NewConsensusMetrics(spectypes.RoleSyncCommittee),
// }
//}
//
Expand Down
2 changes: 1 addition & 1 deletion protocol/v2/ssv/runner/sync_committee_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewSyncCommitteeAggregatorRunner(
valCheck: valCheck,
operatorSigner: operatorSigner,

metrics: metrics.NewConsensusMetrics(spectypes.BNRoleSyncCommitteeContribution),
metrics: metrics.NewConsensusMetrics(spectypes.RoleSyncCommitteeContribution),
}
}

Expand Down
3 changes: 1 addition & 2 deletions protocol/v2/ssv/runner/validator_registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"crypto/sha256"
"encoding/hex"
"encoding/json"

"github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon"

v1 "github.com/attestantio/go-eth2-client/api/v1"
Expand Down Expand Up @@ -54,7 +53,7 @@ func NewValidatorRegistrationRunner(
signer: signer,
operatorSigner: operatorSigner,

metrics: metrics.NewConsensusMetrics(spectypes.BNRoleValidatorRegistration),
metrics: metrics.NewConsensusMetrics(spectypes.RoleValidatorRegistration),
}
}

Expand Down
3 changes: 1 addition & 2 deletions protocol/v2/ssv/runner/voluntary_exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"crypto/sha256"
"encoding/hex"
"encoding/json"

"github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon"

"github.com/attestantio/go-eth2-client/spec/phase0"
Expand Down Expand Up @@ -53,7 +52,7 @@ func NewVoluntaryExitRunner(
signer: signer,
operatorSigner: operatorSigner,

metrics: metrics.NewConsensusMetrics(spectypes.BNRoleVoluntaryExit),
metrics: metrics.NewConsensusMetrics(spectypes.RoleVoluntaryExit),
}
}

Expand Down

0 comments on commit ebe4cef

Please sign in to comment.