diff --git a/protocol/v2/ssv/runner/runner.go b/protocol/v2/ssv/runner/runner.go index 17af764445..a3be4f36e4 100644 --- a/protocol/v2/ssv/runner/runner.go +++ b/protocol/v2/ssv/runner/runner.go @@ -177,7 +177,7 @@ func (b *BaseRunner) basePreConsensusMsgProcessing(runner Runner, signedMsg *spe func (b *BaseRunner) baseConsensusMsgProcessing(logger *zap.Logger, runner Runner, msg *spectypes.SignedSSVMessage) (decided bool, decidedValue spectypes.Encoder, err error) { prevDecided := false if b.hasRunningDuty() && b.State != nil && b.State.RunningInstance != nil { - prevDecided, _ = b.IsRunningInstanceDecided() + prevDecided, _ = b.State.RunningInstance.IsDecided() } // TODO: revert after pre-consensus liveness is fixed @@ -187,9 +187,7 @@ func (b *BaseRunner) baseConsensusMsgProcessing(logger *zap.Logger, runner Runne } } - b.mtx.Lock() decidedMsg, err := b.QBFTController.ProcessMsg(logger, msg) - b.mtx.Unlock() b.compactInstanceIfNeeded(msg) if err != nil { return false, nil, err @@ -377,15 +375,3 @@ func (b *BaseRunner) ShouldProcessNonBeaconDuty(duty spectypes.Duty) error { } return nil } - -func (b *BaseRunner) IsRunningInstanceDecided() (bool, []byte) { - b.mtx.RLock() - defer b.mtx.RUnlock() - return b.State.RunningInstance.IsDecided() -} - -func (b *BaseRunner) GetStateProposalAcceptedForCurrentRound() *spectypes.SignedSSVMessage { - b.mtx.RLock() - defer b.mtx.RUnlock() - return b.State.RunningInstance.State.ProposalAcceptedForCurrentRound -} diff --git a/protocol/v2/ssv/validator/committee.go b/protocol/v2/ssv/validator/committee.go index a0675d482a..fd0baa3f41 100644 --- a/protocol/v2/ssv/validator/committee.go +++ b/protocol/v2/ssv/validator/committee.go @@ -54,7 +54,7 @@ func NewCommittee( operator *spectypes.CommitteeMember, verifier spectypes.SignatureVerifier, createRunnerFn CommitteeRunnerFunc, - // share map[phase0.ValidatorIndex]*spectypes.Share, // TODO Shouldn't we pass the shares map here the same way we do in spec? +// share map[phase0.ValidatorIndex]*spectypes.Share, // TODO Shouldn't we pass the shares map here the same way we do in spec? ) *Committee { return &Committee{ logger: logger, @@ -87,6 +87,22 @@ func (c *Committee) RemoveShare(validatorIndex phase0.ValidatorIndex) { } } +func (c *Committee) StartConsumeQueue(logger *zap.Logger, duty *spectypes.CommitteeDuty) error { + c.mtx.Lock() + defer c.mtx.Unlock() + + r := c.Runners[duty.Slot] + if r == nil { + return errors.New(fmt.Sprintf("no runner found for slot %d", duty.Slot)) + } + + go func() { + err := c.ConsumeQueue(logger, duty.Slot, c.ProcessMessage, r) + logger.Error("failed consuming queue", zap.Error(err)) + }() + return nil +} + // StartDuty starts a new duty for the given slot func (c *Committee) StartDuty(logger *zap.Logger, duty *spectypes.CommitteeDuty) error { c.logger.Debug("Starting committee duty runner", zap.Uint64("slot", uint64(duty.Slot))) @@ -158,8 +174,6 @@ func (c *Committee) StartDuty(logger *zap.Logger, duty *spectypes.CommitteeDuty) } logger = c.logger.With(fields.DutyID(fields.FormatCommitteeDutyID(c.Operator.Committee, c.BeaconNetwork.EstimatedEpochAtSlot(duty.Slot), duty.Slot)), fields.Slot(duty.Slot)) - // TODO alan: stop queue - go c.ConsumeQueue(logger, duty.Slot, c.ProcessMessage, c.Runners[duty.Slot]) logger.Info("ℹ️ starting duty processing") return c.Runners[duty.Slot].StartNewDuty(logger, duty, c.Operator.GetQuorum()) diff --git a/protocol/v2/ssv/validator/committee_queue.go b/protocol/v2/ssv/validator/committee_queue.go index 0c3d138744..7b5f17fc17 100644 --- a/protocol/v2/ssv/validator/committee_queue.go +++ b/protocol/v2/ssv/validator/committee_queue.go @@ -110,7 +110,7 @@ func (v *Committee) ConsumeQueue(logger *zap.Logger, slot phase0.Slot, handler M if runner.HasRunningDuty() { runningInstance = runner.GetBaseRunner().State.RunningInstance if runningInstance != nil { - decided, _ := runner.GetBaseRunner().IsRunningInstanceDecided() + decided, _ := runningInstance.IsDecided() state.HasRunningInstance = !decided } } @@ -125,22 +125,19 @@ func (v *Committee) ConsumeQueue(logger *zap.Logger, slot phase0.Slot, handler M } return e.Type == types.ExecuteDuty } - } else { - proposalAcceptedForCurrentRound := runner.GetBaseRunner().GetStateProposalAcceptedForCurrentRound() - if runningInstance != nil && proposalAcceptedForCurrentRound == nil { - // If no proposal was accepted for the current round, skip prepare & commit messages - // for the current height and round. - filter = func(m *queue.DecodedSSVMessage) bool { - sm, ok := m.Body.(*specqbft.Message) - if !ok { - return true - } - - if sm.Height != state.Height || sm.Round != state.Round { - return true - } - return sm.MsgType != specqbft.PrepareMsgType && sm.MsgType != specqbft.CommitMsgType + } else if runningInstance != nil && runningInstance.State.ProposalAcceptedForCurrentRound == nil { + // If no proposal was accepted for the current round, skip prepare & commit messages + // for the current height and round. + filter = func(m *queue.DecodedSSVMessage) bool { + sm, ok := m.Body.(*specqbft.Message) + if !ok { + return true + } + + if sm.Height != state.Height || sm.Round != state.Round { + return true } + return sm.MsgType != specqbft.PrepareMsgType && sm.MsgType != specqbft.CommitMsgType } } diff --git a/protocol/v2/ssv/validator/duty_executer.go b/protocol/v2/ssv/validator/duty_executer.go index bd4e2d7c18..ab1957d3d0 100644 --- a/protocol/v2/ssv/validator/duty_executer.go +++ b/protocol/v2/ssv/validator/duty_executer.go @@ -38,5 +38,9 @@ func (c *Committee) OnExecuteDuty(logger *zap.Logger, msg *types.EventMsg) error return fmt.Errorf("could not start committee duty: %w", err) } + if err := c.StartConsumeQueue(logger, executeDutyData.Duty); err != nil { + return fmt.Errorf("could not start committee consume queue: %w", err) + } + return nil } diff --git a/protocol/v2/ssv/validator/non_committee_validator.go b/protocol/v2/ssv/validator/non_committee_validator.go index b7a9f5b7d0..f509cd2d7d 100644 --- a/protocol/v2/ssv/validator/non_committee_validator.go +++ b/protocol/v2/ssv/validator/non_committee_validator.go @@ -83,7 +83,7 @@ func (ncv *CommitteeObserver) ProcessMessage(msg *queue.DecodedSSVMessage) error return fmt.Errorf("failed to get partial signature message from network message %w", err) } if partialSigMessages.Type != spectypes.PostConsensusPartialSig { - return fmt.Errorf("not processing message type %s", partialSigMessages.Type) + return fmt.Errorf("not processing message type %b", partialSigMessages.Type) } slot := partialSigMessages.Slot diff --git a/protocol/v2/testing/test_utils.go b/protocol/v2/testing/test_utils.go index 0df2e3aa0c..8780a1e7c9 100644 --- a/protocol/v2/testing/test_utils.go +++ b/protocol/v2/testing/test_utils.go @@ -281,7 +281,7 @@ func ExtractTarGz(gzipStream io.Reader) { default: log.Fatalf( - "ExtractTarGz: uknown type: %s in %s", + "ExtractTarGz: uknown type: %b in %s", header.Typeflag, header.Name) }