Skip to content

Commit

Permalink
splitted startduty and consume queue flows to prevent spec tests data…
Browse files Browse the repository at this point in the history
… race
  • Loading branch information
AKorpusenko committed Jul 25, 2024
1 parent 3c78dcf commit fe32126
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 36 deletions.
16 changes: 1 addition & 15 deletions protocol/v2/ssv/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (b *BaseRunner) basePreConsensusMsgProcessing(runner Runner, signedMsg *spe
func (b *BaseRunner) baseConsensusMsgProcessing(logger *zap.Logger, runner Runner, msg *spectypes.SignedSSVMessage) (decided bool, decidedValue spectypes.Encoder, err error) {
prevDecided := false
if b.hasRunningDuty() && b.State != nil && b.State.RunningInstance != nil {
prevDecided, _ = b.IsRunningInstanceDecided()
prevDecided, _ = b.State.RunningInstance.IsDecided()
}

// TODO: revert after pre-consensus liveness is fixed
Expand All @@ -187,9 +187,7 @@ func (b *BaseRunner) baseConsensusMsgProcessing(logger *zap.Logger, runner Runne
}
}

b.mtx.Lock()
decidedMsg, err := b.QBFTController.ProcessMsg(logger, msg)
b.mtx.Unlock()
b.compactInstanceIfNeeded(msg)
if err != nil {
return false, nil, err
Expand Down Expand Up @@ -377,15 +375,3 @@ func (b *BaseRunner) ShouldProcessNonBeaconDuty(duty spectypes.Duty) error {
}
return nil
}

func (b *BaseRunner) IsRunningInstanceDecided() (bool, []byte) {
b.mtx.RLock()
defer b.mtx.RUnlock()
return b.State.RunningInstance.IsDecided()
}

func (b *BaseRunner) GetStateProposalAcceptedForCurrentRound() *spectypes.SignedSSVMessage {
b.mtx.RLock()
defer b.mtx.RUnlock()
return b.State.RunningInstance.State.ProposalAcceptedForCurrentRound
}
20 changes: 17 additions & 3 deletions protocol/v2/ssv/validator/committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewCommittee(
operator *spectypes.CommitteeMember,
verifier spectypes.SignatureVerifier,
createRunnerFn CommitteeRunnerFunc,
// share map[phase0.ValidatorIndex]*spectypes.Share, // TODO Shouldn't we pass the shares map here the same way we do in spec?
// share map[phase0.ValidatorIndex]*spectypes.Share, // TODO Shouldn't we pass the shares map here the same way we do in spec?
) *Committee {
return &Committee{
logger: logger,
Expand Down Expand Up @@ -87,6 +87,22 @@ func (c *Committee) RemoveShare(validatorIndex phase0.ValidatorIndex) {
}
}

func (c *Committee) StartConsumeQueue(logger *zap.Logger, duty *spectypes.CommitteeDuty) error {
c.mtx.Lock()
defer c.mtx.Unlock()

r := c.Runners[duty.Slot]
if r == nil {
return errors.New(fmt.Sprintf("no runner found for slot %d", duty.Slot))
}

go func() {
err := c.ConsumeQueue(logger, duty.Slot, c.ProcessMessage, r)
logger.Error("failed consuming queue", zap.Error(err))
}()
return nil
}

// StartDuty starts a new duty for the given slot
func (c *Committee) StartDuty(logger *zap.Logger, duty *spectypes.CommitteeDuty) error {
c.logger.Debug("Starting committee duty runner", zap.Uint64("slot", uint64(duty.Slot)))
Expand Down Expand Up @@ -158,8 +174,6 @@ func (c *Committee) StartDuty(logger *zap.Logger, duty *spectypes.CommitteeDuty)
}

logger = c.logger.With(fields.DutyID(fields.FormatCommitteeDutyID(c.Operator.Committee, c.BeaconNetwork.EstimatedEpochAtSlot(duty.Slot), duty.Slot)), fields.Slot(duty.Slot))
// TODO alan: stop queue
go c.ConsumeQueue(logger, duty.Slot, c.ProcessMessage, c.Runners[duty.Slot])

logger.Info("ℹ️ starting duty processing")
return c.Runners[duty.Slot].StartNewDuty(logger, duty, c.Operator.GetQuorum())
Expand Down
29 changes: 13 additions & 16 deletions protocol/v2/ssv/validator/committee_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (v *Committee) ConsumeQueue(logger *zap.Logger, slot phase0.Slot, handler M
if runner.HasRunningDuty() {
runningInstance = runner.GetBaseRunner().State.RunningInstance
if runningInstance != nil {
decided, _ := runner.GetBaseRunner().IsRunningInstanceDecided()
decided, _ := runningInstance.IsDecided()
state.HasRunningInstance = !decided
}
}
Expand All @@ -125,22 +125,19 @@ func (v *Committee) ConsumeQueue(logger *zap.Logger, slot phase0.Slot, handler M
}
return e.Type == types.ExecuteDuty
}
} else {
proposalAcceptedForCurrentRound := runner.GetBaseRunner().GetStateProposalAcceptedForCurrentRound()
if runningInstance != nil && proposalAcceptedForCurrentRound == nil {
// If no proposal was accepted for the current round, skip prepare & commit messages
// for the current height and round.
filter = func(m *queue.DecodedSSVMessage) bool {
sm, ok := m.Body.(*specqbft.Message)
if !ok {
return true
}

if sm.Height != state.Height || sm.Round != state.Round {
return true
}
return sm.MsgType != specqbft.PrepareMsgType && sm.MsgType != specqbft.CommitMsgType
} else if runningInstance != nil && runningInstance.State.ProposalAcceptedForCurrentRound == nil {
// If no proposal was accepted for the current round, skip prepare & commit messages
// for the current height and round.
filter = func(m *queue.DecodedSSVMessage) bool {
sm, ok := m.Body.(*specqbft.Message)
if !ok {
return true
}

if sm.Height != state.Height || sm.Round != state.Round {
return true
}
return sm.MsgType != specqbft.PrepareMsgType && sm.MsgType != specqbft.CommitMsgType
}
}

Expand Down
4 changes: 4 additions & 0 deletions protocol/v2/ssv/validator/duty_executer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,9 @@ func (c *Committee) OnExecuteDuty(logger *zap.Logger, msg *types.EventMsg) error
return fmt.Errorf("could not start committee duty: %w", err)
}

if err := c.StartConsumeQueue(logger, executeDutyData.Duty); err != nil {
return fmt.Errorf("could not start committee consume queue: %w", err)
}

return nil
}
2 changes: 1 addition & 1 deletion protocol/v2/ssv/validator/non_committee_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (ncv *CommitteeObserver) ProcessMessage(msg *queue.DecodedSSVMessage) error
return fmt.Errorf("failed to get partial signature message from network message %w", err)
}
if partialSigMessages.Type != spectypes.PostConsensusPartialSig {
return fmt.Errorf("not processing message type %s", partialSigMessages.Type)
return fmt.Errorf("not processing message type %b", partialSigMessages.Type)
}

slot := partialSigMessages.Slot
Expand Down
2 changes: 1 addition & 1 deletion protocol/v2/testing/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func ExtractTarGz(gzipStream io.Reader) {

default:
log.Fatalf(
"ExtractTarGz: uknown type: %s in %s",
"ExtractTarGz: uknown type: %b in %s",
header.Typeflag,
header.Name)
}
Expand Down

0 comments on commit fe32126

Please sign in to comment.