Skip to content

Commit

Permalink
removed locking locking all consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
AKorpusenko committed Jul 22, 2024
1 parent 1abf4cb commit 3c78dcf
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 8 deletions.
16 changes: 15 additions & 1 deletion protocol/v2/ssv/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (b *BaseRunner) basePreConsensusMsgProcessing(runner Runner, signedMsg *spe
func (b *BaseRunner) baseConsensusMsgProcessing(logger *zap.Logger, runner Runner, msg *spectypes.SignedSSVMessage) (decided bool, decidedValue spectypes.Encoder, err error) {
prevDecided := false
if b.hasRunningDuty() && b.State != nil && b.State.RunningInstance != nil {
prevDecided, _ = b.State.RunningInstance.IsDecided()
prevDecided, _ = b.IsRunningInstanceDecided()
}

// TODO: revert after pre-consensus liveness is fixed
Expand All @@ -187,7 +187,9 @@ func (b *BaseRunner) baseConsensusMsgProcessing(logger *zap.Logger, runner Runne
}
}

b.mtx.Lock()
decidedMsg, err := b.QBFTController.ProcessMsg(logger, msg)
b.mtx.Unlock()
b.compactInstanceIfNeeded(msg)
if err != nil {
return false, nil, err
Expand Down Expand Up @@ -375,3 +377,15 @@ func (b *BaseRunner) ShouldProcessNonBeaconDuty(duty spectypes.Duty) error {
}
return nil
}

func (b *BaseRunner) IsRunningInstanceDecided() (bool, []byte) {
b.mtx.RLock()
defer b.mtx.RUnlock()
return b.State.RunningInstance.IsDecided()
}

func (b *BaseRunner) GetStateProposalAcceptedForCurrentRound() *spectypes.SignedSSVMessage {
b.mtx.RLock()
defer b.mtx.RUnlock()
return b.State.RunningInstance.State.ProposalAcceptedForCurrentRound
}
2 changes: 1 addition & 1 deletion protocol/v2/ssv/validator/committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ func (c *Committee) ProcessMessage(logger *zap.Logger, msg *queue.DecodedSSVMess
return errors.Wrap(err, "invalid qbft Message")
}
c.mtx.Lock()
defer c.mtx.Unlock() // read c.Runners map + ProcessConsensus r/w state
runner, exists := c.Runners[phase0.Slot(qbftMsg.Height)]
c.mtx.Unlock()
if !exists {
return errors.New("no runner found for message's slot")
}
Expand Down
8 changes: 2 additions & 6 deletions protocol/v2/ssv/validator/committee_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,8 @@ func (v *Committee) ConsumeQueue(logger *zap.Logger, slot phase0.Slot, handler M
if runner.HasRunningDuty() {
runningInstance = runner.GetBaseRunner().State.RunningInstance
if runningInstance != nil {
v.mtx.RLock()
decided, _ := runningInstance.IsDecided()
decided, _ := runner.GetBaseRunner().IsRunningInstanceDecided()
state.HasRunningInstance = !decided
v.mtx.RUnlock()
}
}

Expand All @@ -128,9 +126,7 @@ func (v *Committee) ConsumeQueue(logger *zap.Logger, slot phase0.Slot, handler M
return e.Type == types.ExecuteDuty
}
} else {
v.mtx.RLock()
proposalAcceptedForCurrentRound := runningInstance.State.ProposalAcceptedForCurrentRound
v.mtx.RUnlock()
proposalAcceptedForCurrentRound := runner.GetBaseRunner().GetStateProposalAcceptedForCurrentRound()
if runningInstance != nil && proposalAcceptedForCurrentRound == nil {
// If no proposal was accepted for the current round, skip prepare & commit messages
// for the current height and round.
Expand Down

0 comments on commit 3c78dcf

Please sign in to comment.