Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: assert on HardState field modifications #133169

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6188,7 +6188,7 @@ func TestRaftForceCampaignPreVoteCheckQuorum(t *testing.T) {

// Force-campaign n3. It may not win or hold onto leadership, but it's enough
// to know that it bumped the term.
repl3.ForceCampaign(ctx)
repl3.ForceCampaign(ctx, initialStatus)
t.Logf("n3 campaigning")

var leaderStatus *raft.Status
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,10 @@ func (r *Replica) Campaign(ctx context.Context) {
}

// ForceCampaign force-campaigns the replica.
func (r *Replica) ForceCampaign(ctx context.Context) {
func (r *Replica) ForceCampaign(ctx context.Context, raftStatus raft.BasicStatus) {
r.mu.Lock()
defer r.mu.Unlock()
r.forceCampaignLocked(ctx)
r.forceCampaignLocked(ctx, raftStatus)
}

// LastAssignedLeaseIndexRLocked is like LastAssignedLeaseIndex, but requires
Expand Down
25 changes: 21 additions & 4 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1321,7 +1321,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
raftStatus := raftGroup.BasicStatus()
if shouldCampaignAfterConfChange(ctx, r.store.ClusterSettings(), r.store.StoreID(),
r.descRLocked(), raftStatus, leaseStatus) {
r.forceCampaignLocked(ctx)
r.forceCampaignLocked(ctx, raftStatus)
}
}

Expand Down Expand Up @@ -2613,11 +2613,28 @@ func (r *Replica) campaignLocked(ctx context.Context) {
// simply partitioned away from it and/or liveness.
//
// TODO(nvanbenschoten): this is the remaining logic which needs work in order
// to complete #125254. See the comment in raft.go about how even a local
// to complete #129796. See the comment in raft.go about how even a local
// fortification check is not enough to make MsgTimeoutNow safe.
func (r *Replica) forceCampaignLocked(ctx context.Context) {
func (r *Replica) forceCampaignLocked(ctx context.Context, raftStatus raft.BasicStatus) {
log.VEventf(ctx, 3, "force campaigning")
msg := raftpb.Message{To: raftpb.PeerID(r.replicaID), Type: raftpb.MsgTimeoutNow}
msg := raftpb.Message{
To: raftpb.PeerID(r.replicaID),
// We pretend that the message was sent from the leader, who we know is set
// in the status because we checked in shouldCampaignAfterConfChange.
//
// As the TODO above implies, this is a hack and is borderline unsafe. Only
// the leader should be able to send a MsgTimeoutNow, because a "force"
// election gives the candidate permission to tell all voters to violate the
// leader's fortification. In this case, we lie to raft about this message
// coming from the leader, which in some ways it is because the leader is
// the one committing the entry which demotes itself to a learner.
//
// We should find a way to get rid of this, perhaps through the approach
// presented in #133308.
From: raftStatus.Lead,
Term: raftStatus.Term,
Type: raftpb.MsgTimeoutNow,
}
if err := r.mu.internalRaftGroup.Step(msg); err != nil {
log.VEventf(ctx, 1, "failed to campaign: %s", err)
}
Expand Down
127 changes: 79 additions & 48 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,6 @@ type raft struct {
// leadEpoch, if set, corresponds to the StoreLiveness epoch that this peer
// has supported the leader in. It's unset if the peer hasn't supported the
// current leader.
//
// TODO(arul): This should be populated when responding to a MsgFortify.
leadEpoch pb.Epoch
// leadTransferee, if set, is the id of the leader transfer target during a
// pending leadership transfer. The value is set while the outgoing leader
Expand Down Expand Up @@ -444,7 +442,6 @@ func newRaft(c *Config) *raft {

r := &raft{
id: c.ID,
lead: None,
isLearner: false,
raftLog: raftlog,
maxMsgSize: entryEncodingSize(c.MaxSizePerMsg),
Expand Down Expand Up @@ -818,7 +815,7 @@ func (r *raft) sendFortify(to pb.PeerID) {
// Doing so avoids a self-addressed message.
epoch, live := r.storeLiveness.SupportFor(r.lead)
if live {
r.leadEpoch = epoch
r.setLeadEpoch(epoch)
// The leader needs to persist the LeadEpoch durably before it can start
// supporting itself. We do so by sending a self-addressed
// MsgFortifyLeaderResp message so that it is added to the msgsAfterAppend
Expand Down Expand Up @@ -1033,10 +1030,7 @@ func (r *raft) reset(term uint64) {
// de-fortification.
assertTrue(!r.supportingFortifiedLeader() || r.lead == r.id,
"should not be changing terms when supporting a fortified leader; leader exempted")
r.Term = term
r.Vote = None
r.lead = None
r.leadEpoch = 0
r.setTerm(term)
}

r.electionElapsed = 0
Expand All @@ -1063,6 +1057,51 @@ func (r *raft) reset(term uint64) {
r.uncommittedSize = 0
}

func (r *raft) setTerm(term uint64) {
if term == r.Term {
return
}
assertTrue(term > r.Term, "term cannot regress")
r.Term = term
r.Vote = None
r.lead = None
r.leadEpoch = 0
}

func (r *raft) setVote(id pb.PeerID) {
if id == r.Vote {
return
}
assertTrue(r.Vote == None, "cannot change vote")
r.Vote = id
}

func (r *raft) setLead(lead pb.PeerID) {
if lead == r.lead {
return
}
assertTrue(r.lead == None, "cannot change lead")
r.lead = lead
}

func (r *raft) resetLead() {
r.lead = None
r.leadEpoch = 0
}

func (r *raft) setLeadEpoch(leadEpoch pb.Epoch) {
if leadEpoch == r.leadEpoch {
return
}
assertTrue(r.lead != None, "leader must be set")
assertTrue(leadEpoch > r.leadEpoch, "leadEpoch cannot regress")
r.leadEpoch = leadEpoch
}

func (r *raft) resetLeadEpoch() {
r.leadEpoch = 0
Copy link
Collaborator

@pav-kv pav-kv Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-actionable, but just curious about the fortification protocol: is it safe to reset leadEpoch to 0 in the de-fortify case?

Can the leadEpoch be set to non-zero again during the course of the term? If yes, can it be set to a value below / different than the last non-zero leadEpoch? As a thought experiment: what happens when messages are delayed / reordered / duplicated: can we get in and out of leadEpoch == 0 / in what ways, and is it fine?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is safe to reset to 0 when if either (1) the leader has told us to defortify, or (2) we have withdrawn support for the lead epoch in the store liveness network.

leadEpoch can then be set to a non-zero value again if a new MsgForgetLeader is received, which is by design. However, the value must be equal or greater than it was before it was reset to 0, because it comes from a call to storeLiveness.SupportFor, which is guaranteed to be monotonic.

}

func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
last := r.raftLog.lastEntryID()
for i := range es {
Expand Down Expand Up @@ -1214,7 +1253,7 @@ func (r *raft) becomeFollower(term uint64, lead pb.PeerID) {
r.step = stepFollower
r.reset(term)
r.tick = r.tickElection
r.lead = lead
r.setLead(lead)
r.state = pb.StateFollower
r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}
Expand All @@ -1227,7 +1266,7 @@ func (r *raft) becomeCandidate() {
r.step = stepCandidate
r.reset(r.Term + 1)
r.tick = r.tickElection
r.Vote = r.id
r.setVote(r.id)
r.state = pb.StateCandidate
r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}
Expand All @@ -1250,8 +1289,7 @@ func (r *raft) becomePreCandidate() {
// leader leases, this is fine, because we wouldn't be here unless we'd
// revoked StoreLiveness support for the leader's store to begin with. It's
// a bit weird from the perspective of raft though. See if we can avoid this.
r.lead = None
r.leadEpoch = 0
r.resetLead()
r.state = pb.StatePreCandidate
r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)
}
Expand All @@ -1269,7 +1307,7 @@ func (r *raft) becomeLeader() {
// and not even when learning of a leader in a later term.
r.fortificationTracker.Reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.setLead(r.id)
r.state = pb.StateLeader
// Followers enter replicate mode when they've been successfully probed
// (perhaps after having received a snapshot as a result). The leader is
Expand Down Expand Up @@ -1622,7 +1660,7 @@ func (r *raft) Step(m pb.Message) error {
if m.Type == pb.MsgVote {
// Only record real votes.
r.electionElapsed = 0
r.Vote = m.From
r.setVote(m.From)
}
} else {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
Expand Down Expand Up @@ -2017,6 +2055,13 @@ func stepLeader(r *raft, m pb.Message) error {
// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
// whether they respond to MsgVoteResp or MsgPreVoteResp.
func stepCandidate(r *raft, m pb.Message) error {
if IsMsgFromLeader(m.Type) {
// If this is a message from a leader of r.Term, transition to a follower
// with the sender of the message as the leader, then process the message.
assertTrue(m.Term == r.Term, "message term should equal current term")
r.becomeFollower(m.Term, m.From)
return r.step(r, m) // stepFollower
}
// Only handle vote responses corresponding to our candidacy (while in
// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
// our pre-candidate state).
Expand All @@ -2030,18 +2075,11 @@ func stepCandidate(r *raft, m pb.Message) error {
case pb.MsgProp:
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
case pb.MsgApp:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleHeartbeat(m)
case pb.MsgSnap:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
// TODO(nvanbenschoten): we can't consider MsgSnap to be from the leader of
// Message.Term until we address #127348 and #127349.
r.becomeFollower(m.Term, None)
r.handleSnapshot(m)
case pb.MsgFortifyLeader:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleFortify(m)
case myVoteRespType:
gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
Expand All @@ -2059,13 +2097,20 @@ func stepCandidate(r *raft, m pb.Message) error {
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, r.lead)
}
case pb.MsgTimeoutNow:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removal seems unrelated to this change. Is it removed because the message is not useful? LGTM, but just checking the reason for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is removed because MsgTimeoutNow is caught by IsMsgFromLeader. With this change, a candidate will now handle the MsgTimeoutNow as an instruction to force campaign at the next term, instead of ignoring it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a behaviour change (previously we would stay candidate because we've already started election and don't need to do it again; now we will step back to follower and re-campaign again?). I don't have a good sense whether it's a benign change. Would it be possible to retain the old behaviour?

Also, do we need to assume MsgTimeoutNow came from the leader? It's a forceful campaign, and in theory any node could randomly decide to do that without the leader's involvement. Perhaps the uncertainty comes from the fact that it's used both for forced campaigns and leader transfers (and in the latter case it makes sense to say that the message is made on behalf of this leader).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is new behavior. But it's desirable behavior. I've extracted it into #133490.

Also, do we need to assume MsgTimeoutNow came from the leader? It's a forceful campaign, and in theory any node could randomly decide to do that without the leader's involvement. Perhaps the uncertainty comes from the fact that it's used both for forced campaigns and leader transfers (and in the latter case it makes sense to say that the message is made on behalf of this leader).

It used to be disruptive but safe to allow any node to decide to perform a force campaign. With leader fortification and leader leases, it is now very unsafe. A force campaign will instruct any voter to ignore the fortification promise that it has made to the leader, which could cause a lease expiration regression (overlapping leases). Force campaigns are only safe if the leader is the one initiating them, because it will step down to a follower at the same time.

used both for forced campaigns and leader transfers

The notion of a "forced campaign", separate from a leader transfer is relatively new. It came from #104969 and I'd like to eliminate the concept with something like #133308.

r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
}
return nil
}

func stepFollower(r *raft, m pb.Message) error {
if IsMsgFromLeader(m.Type) {
r.setLead(m.From)
if m.Type != pb.MsgDeFortifyLeader {
// If we receive any message from the leader except a MsgDeFortifyLeader,
// we know that the leader is still alive and still acting as the leader,
// so reset the election timer.
r.electionElapsed = 0
}
}
switch m.Type {
case pb.MsgProp:
if r.lead == None {
Expand All @@ -2081,25 +2126,12 @@ func stepFollower(r *raft, m pb.Message) error {
m.To = r.lead
r.send(m)
case pb.MsgApp:
r.electionElapsed = 0
// TODO(arul): Once r.lead != None, we shouldn't need to update r.lead
pav-kv marked this conversation as resolved.
Show resolved Hide resolved
// anymore within the course of a single term (in the context of which this
// function is always called). Instead, if r.lead != None, we should be able
// to assert that the leader hasn't changed within a given term. Maybe at
// the caller itself.
r.lead = m.From
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
r.electionElapsed = 0
r.lead = m.From
r.handleHeartbeat(m)
case pb.MsgSnap:
r.electionElapsed = 0
r.lead = m.From
r.handleSnapshot(m)
case pb.MsgFortifyLeader:
r.electionElapsed = 0
r.lead = m.From
r.handleFortify(m)
case pb.MsgDeFortifyLeader:
r.handleDeFortify(m)
Expand All @@ -2126,8 +2158,7 @@ func stepFollower(r *raft, m pb.Message) error {
return nil
}
r.logger.Infof("%x forgetting leader %x at term %d", r.id, r.lead, r.Term)
r.lead = None
r.leadEpoch = 0
r.resetLead()
case pb.MsgTimeoutNow:
// TODO(nvanbenschoten): we will eventually want some kind of logic like
// this. However, even this may not be enough, because we're calling a
Expand All @@ -2148,7 +2179,7 @@ func stepFollower(r *raft, m pb.Message) error {
// be able to replace this leadEpoch assignment with a call to deFortify.
// Currently, it may panic because only the leader should be able to
// de-fortify without bumping the term.
r.leadEpoch = 0
r.resetLeadEpoch()
r.hup(campaignTransfer)
}
return nil
Expand Down Expand Up @@ -2316,7 +2347,7 @@ func (r *raft) handleFortify(m pb.Message) {
})
return
}
r.leadEpoch = epoch
r.setLeadEpoch(epoch)
r.send(pb.Message{
To: m.From,
Type: pb.MsgFortifyLeaderResp,
Expand Down Expand Up @@ -2378,7 +2409,7 @@ func (r *raft) deFortify(from pb.PeerID, term uint64) {
(term == r.Term && from == r.id && !r.supportingFortifiedLeader()),
"can only defortify at current term if told by the leader or if fortification has expired",
)
r.leadEpoch = 0
r.resetLeadEpoch()
}

// restore recovers the state machine from a snapshot. It restores the log and the
Expand Down Expand Up @@ -2584,10 +2615,10 @@ func (r *raft) loadState(state pb.HardState) {
r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
}
r.raftLog.committed = state.Commit
r.Term = state.Term
r.Vote = state.Vote
r.lead = state.Lead
r.leadEpoch = state.LeadEpoch
r.setTerm(state.Term)
r.setVote(state.Vote)
r.setLead(state.Lead)
r.setLeadEpoch(state.LeadEpoch)
}

// pastElectionTimeout returns true if r.electionElapsed is greater
Expand Down
27 changes: 26 additions & 1 deletion pkg/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2615,7 +2615,7 @@ func TestRestoreFromSnapMsg(t *testing.T) {
sm := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
sm.Step(m)

assert.Equal(t, pb.PeerID(1), sm.lead)
assert.Equal(t, None, sm.lead)
// TODO(bdarnell): what should this test?
}

Expand Down Expand Up @@ -3099,6 +3099,31 @@ func TestLeaderTransferToSlowFollower(t *testing.T) {
checkLeaderTransferState(t, lead, pb.StateFollower, 3)
}

func TestLeaderTransferToCandidate(t *testing.T) {
nt := newNetworkWithConfig(preVoteConfigWithFortificationDisabled, nil, nil, nil)
n3 := nt.peers[3].(*raft)

// Elect node 1 as the leader of term 1.
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
require.Equal(t, uint64(1), n3.Term)

// Isolate node 3 so that decides to become a pre-candidate.
nt.isolate(3)
for i := 0; i < n3.randomizedElectionTimeout; i++ {
n3.tick()
}
require.Equal(t, pb.StatePreCandidate, n3.state)
require.Equal(t, uint64(1), n3.Term)

// Reconnect node 3 and initiate a transfer of leadership from node 1 to node
// 3, all before node 3 steps back to a follower. This will instruct node 3 to
// call an election at the next term, which it can and does win.
nt.recover()
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
require.Equal(t, pb.StateLeader, n3.state)
require.Equal(t, uint64(2), n3.Term)
}

func TestLeaderTransferAfterSnapshot(t *testing.T) {
nt := newNetwork(nil, nil, nil)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
Expand Down
2 changes: 1 addition & 1 deletion pkg/raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestRawNodeStep(t *testing.T) {
rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s))
require.NoError(t, err, "#%d", i)
msgt := pb.MessageType(i)
err = rawNode.Step(pb.Message{Type: msgt})
err = rawNode.Step(pb.Message{Type: msgt, From: 2})
// LocalMsg should be ignored.
if IsLocalMsg(msgt) {
assert.Equal(t, ErrStepLocalMsg, err, "#%d", i)
Expand Down
6 changes: 4 additions & 2 deletions pkg/raft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ var isResponseMsg = [...]bool{
}

var isMsgFromLeader = [...]bool{
pb.MsgApp: true,
pb.MsgSnap: true,
pb.MsgApp: true,
// TODO(nvanbenschoten): we can't consider MsgSnap to be from the leader of
// Message.Term until we address #127348 and #127349.
// pb.MsgSnap: true,
pb.MsgHeartbeat: true,
pb.MsgTimeoutNow: true,
pb.MsgFortifyLeader: true,
Expand Down
Loading
Loading