Skip to content

Commit

Permalink
kvserver: get TestRaftCheckQuorum working with leader leases
Browse files Browse the repository at this point in the history
Epic: CRDB-8035

Release note: None
  • Loading branch information
arulajmani committed Oct 25, 2024
1 parent d603c40 commit e19016b
Show file tree
Hide file tree
Showing 12 changed files with 245 additions and 167 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ go_test(
"//pkg/kv/kvserver/split",
"//pkg/kv/kvserver/stateloader",
"//pkg/kv/kvserver/storeliveness",
"//pkg/kv/kvserver/storeliveness/storelivenesspb",
"//pkg/kv/kvserver/tenantrate",
"//pkg/kv/kvserver/tscache",
"//pkg/kv/kvserver/txnwait",
Expand Down
47 changes: 38 additions & 9 deletions pkg/kv/kvserver/client_raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness/storelivenesspb"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -50,8 +52,11 @@ func noopRaftHandlerFuncs() unreliableRaftHandlerFuncs {
}
}

// unreliableRaftHandler drops all Raft messages that are addressed to the
// specified rangeID, but lets all other messages through.
// unreliableRaftHandler drops all Raft messages messages that are addressed to
// the specified rangeID, but lets all other messages through.
//
// TODO(arul): consider adding a wrapper over unreliableRaftHandler and
// storeliveness.unreliableHandler, as both of these go hand in hand.
type unreliableRaftHandler struct {
name string
rangeID roachpb.RangeID
Expand Down Expand Up @@ -226,8 +231,8 @@ func (h *testClusterStoreRaftMessageHandler) HandleDelegatedSnapshot(
return store.HandleDelegatedSnapshot(ctx, req)
}

// testClusterPartitionedRange is a convenient abstraction to create a range on a node
// in a multiTestContext which can be partitioned and unpartitioned.
// testClusterPartitionedRange is a convenient abstraction to create a range on
// a node in a multiTestContext which can be partitioned and unpartitioned.
type testClusterPartitionedRange struct {
rangeID roachpb.RangeID
mu struct {
Expand Down Expand Up @@ -419,22 +424,34 @@ func (pr *testClusterPartitionedRange) extend(
func dropRaftMessagesFrom(
t *testing.T,
srv serverutils.TestServerInterface,
rangeID roachpb.RangeID,
desc roachpb.RangeDescriptor,
fromReplicaIDs []roachpb.ReplicaID,
cond *atomic.Bool,
) {
store, err := srv.GetStores().(*kvserver.Stores).GetStore(srv.GetFirstStoreID())
require.NoError(t, err)

dropFrom := map[roachpb.ReplicaID]bool{}
dropFromStore := map[roachpb.StoreID]bool{}
for _, id := range fromReplicaIDs {
dropFrom[id] = true
rep, ok := desc.GetReplicaDescriptorByID(id)
if !ok {
t.Fatal("replica not found in range descriptor")
}
t.Logf("from store %d; adding replica %s to drop list", store.StoreID(), id)
t.Logf("from store %d; adding store %s to drop list", store.StoreID(), rep.StoreID)
dropFromStore[rep.StoreID] = true
}
shouldDrop := func(rID roachpb.RangeID, from roachpb.ReplicaID) bool {
return rID == rangeID && (cond == nil || cond.Load()) && dropFrom[from]
return rID == desc.RangeID && (cond == nil || cond.Load()) && dropFrom[from]
}
shouldDropFromStore := func(from roachpb.StoreID) bool {
return (cond == nil || cond.Load()) && dropFromStore[from]
}

store, err := srv.GetStores().(*kvserver.Stores).GetStore(srv.GetFirstStoreID())
require.NoError(t, err)
srv.RaftTransport().(*kvserver.RaftTransport).ListenIncomingRaftMessages(store.StoreID(), &unreliableRaftHandler{
rangeID: rangeID,
rangeID: desc.RangeID,
IncomingRaftMessageHandler: store,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropHB: func(hb *kvserverpb.RaftHeartbeat) bool {
Expand All @@ -448,6 +465,18 @@ func dropRaftMessagesFrom(
},
},
})
srv.StoreLivenessTransport().(*storeliveness.Transport).ListenMessages(store.StoreID(), &storeliveness.UnreliableHandler{
MessageHandler: store.StoreLivenessSupportManager(),
UnreliableHandlerFuncs: storeliveness.UnreliableHandlerFuncs{
DropStoreLivenessMsg: func(msg *storelivenesspb.Message) bool {
drop := shouldDropFromStore(msg.From.StoreID)
if drop {
t.Logf("dropping msg %s from store %d: to %d", msg.Type, msg.From.StoreID, msg.To.StoreID)
}
return drop
},
},
})
}

// getMapsDiff returns the difference between the values of corresponding
Expand Down
35 changes: 20 additions & 15 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6314,13 +6314,13 @@ func TestRaftPreVote(t *testing.T) {
// Configure the partition, but don't activate it yet.
if partial {
// Partition n3 away from n1, in both directions.
dropRaftMessagesFrom(t, tc.Servers[0], rangeID, []roachpb.ReplicaID{3}, &partitioned)
dropRaftMessagesFrom(t, tc.Servers[2], rangeID, []roachpb.ReplicaID{1}, &partitioned)
dropRaftMessagesFrom(t, tc.Servers[0], desc, []roachpb.ReplicaID{3}, &partitioned)
dropRaftMessagesFrom(t, tc.Servers[2], desc, []roachpb.ReplicaID{1}, &partitioned)
} else {
// Partition n3 away from both of n1 and n2, in both directions.
dropRaftMessagesFrom(t, tc.Servers[0], rangeID, []roachpb.ReplicaID{3}, &partitioned)
dropRaftMessagesFrom(t, tc.Servers[1], rangeID, []roachpb.ReplicaID{3}, &partitioned)
dropRaftMessagesFrom(t, tc.Servers[2], rangeID, []roachpb.ReplicaID{1, 2}, &partitioned)
dropRaftMessagesFrom(t, tc.Servers[0], desc, []roachpb.ReplicaID{3}, &partitioned)
dropRaftMessagesFrom(t, tc.Servers[1], desc, []roachpb.ReplicaID{3}, &partitioned)
dropRaftMessagesFrom(t, tc.Servers[2], desc, []roachpb.ReplicaID{1, 2}, &partitioned)
}

// Make sure the lease is on n1 and that everyone has applied it.
Expand Down Expand Up @@ -6492,6 +6492,10 @@ func TestRaftCheckQuorum(t *testing.T) {
kvserver.TransferExpirationLeasesFirstEnabled.Override(ctx, &st.SV, false)
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false)

if kvserver.RaftLeaderFortificationFractionEnabled.Get(&st.SV) > 0 && quiesce {
return // quiescence is not possible with leader leases
}

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Expand Down Expand Up @@ -6534,11 +6538,11 @@ func TestRaftCheckQuorum(t *testing.T) {
// Set up dropping of inbound messages on n1 from n2,n3, but don't
// activate it yet.
var partitioned atomic.Bool
dropRaftMessagesFrom(t, tc.Servers[0], desc.RangeID, []roachpb.ReplicaID{2, 3}, &partitioned)
dropRaftMessagesFrom(t, tc.Servers[0], desc, []roachpb.ReplicaID{2, 3}, &partitioned)
if symmetric {
// Drop outbound messages from n1 to n2,n3 too.
dropRaftMessagesFrom(t, tc.Servers[1], desc.RangeID, []roachpb.ReplicaID{1}, &partitioned)
dropRaftMessagesFrom(t, tc.Servers[2], desc.RangeID, []roachpb.ReplicaID{1}, &partitioned)
dropRaftMessagesFrom(t, tc.Servers[1], desc, []roachpb.ReplicaID{1}, &partitioned)
dropRaftMessagesFrom(t, tc.Servers[2], desc, []roachpb.ReplicaID{1}, &partitioned)
}

// Make sure the lease is on n1 and that everyone has applied it.
Expand Down Expand Up @@ -6584,9 +6588,9 @@ func TestRaftCheckQuorum(t *testing.T) {
require.Eventually(t, func() bool {
status := repl1.RaftStatus()
logStatus(status)
return status.RaftState == raftpb.StatePreCandidate
return status.RaftState != raftpb.StateLeader
}, 10*time.Second, 500*time.Millisecond)
t.Logf("n1 became pre-candidate")
t.Logf("n1 stepped down as the leader")

// In the case of a symmetric partition of a quiesced range, we have to
// wake up n2 to elect a new leader.
Expand Down Expand Up @@ -6614,9 +6618,10 @@ func TestRaftCheckQuorum(t *testing.T) {
require.Never(t, func() bool {
status := repl1.RaftStatus()
logStatus(status)
return status.RaftState != raftpb.StatePreCandidate
expState := status.Lead == raft.None && status.RaftState == raftpb.StateFollower
return !expState
}, 3*time.Second, 500*time.Millisecond)
t.Logf("n1 remains pre-candidate")
t.Logf("n1 remains a follower and hasn't heard from the leader")

// The existing leader shouldn't have been affected by n1's prevotes.
var finalStatus *raft.Status
Expand Down Expand Up @@ -6925,9 +6930,9 @@ func TestRaftPreVoteUnquiesceDeadLeader(t *testing.T) {

// Set up a complete partition for n1, but don't activate it yet.
var partitioned atomic.Bool
dropRaftMessagesFrom(t, tc.Servers[0], desc.RangeID, []roachpb.ReplicaID{2, 3}, &partitioned)
dropRaftMessagesFrom(t, tc.Servers[1], desc.RangeID, []roachpb.ReplicaID{1}, &partitioned)
dropRaftMessagesFrom(t, tc.Servers[2], desc.RangeID, []roachpb.ReplicaID{1}, &partitioned)
dropRaftMessagesFrom(t, tc.Servers[0], desc, []roachpb.ReplicaID{2, 3}, &partitioned)
dropRaftMessagesFrom(t, tc.Servers[1], desc, []roachpb.ReplicaID{1}, &partitioned)
dropRaftMessagesFrom(t, tc.Servers[2], desc, []roachpb.ReplicaID{1}, &partitioned)

// Make sure the lease is on n1 and that everyone has applied it.
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(0))
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,9 +856,9 @@ func TestReplicaCircuitBreaker_Partial_Retry(t *testing.T) {
// requests and node liveness heartbeats still succeed.
partitioned := &atomic.Bool{}
partitioned.Store(true)
dropRaftMessagesFrom(t, n1, desc.RangeID, []roachpb.ReplicaID{3}, partitioned)
dropRaftMessagesFrom(t, n2, desc.RangeID, []roachpb.ReplicaID{3}, partitioned)
dropRaftMessagesFrom(t, n3, desc.RangeID, []roachpb.ReplicaID{1, 2}, partitioned)
dropRaftMessagesFrom(t, n1, desc, []roachpb.ReplicaID{3}, partitioned)
dropRaftMessagesFrom(t, n2, desc, []roachpb.ReplicaID{3}, partitioned)
dropRaftMessagesFrom(t, n3, desc, []roachpb.ReplicaID{1, 2}, partitioned)
t.Logf("partitioned n3 raft traffic from n1 and n2")

repl3.TripBreaker()
Expand Down Expand Up @@ -897,8 +897,8 @@ func TestReplicaCircuitBreaker_Partial_Retry(t *testing.T) {

// Also partition n1 and n2 away from each other, and trip their breakers. All
// nodes are now completely partitioned away from each other.
dropRaftMessagesFrom(t, n1, desc.RangeID, []roachpb.ReplicaID{2, 3}, partitioned)
dropRaftMessagesFrom(t, n2, desc.RangeID, []roachpb.ReplicaID{1, 3}, partitioned)
dropRaftMessagesFrom(t, n1, desc, []roachpb.ReplicaID{2, 3}, partitioned)
dropRaftMessagesFrom(t, n2, desc, []roachpb.ReplicaID{1, 3}, partitioned)

repl1.TripBreaker()
repl2.TripBreaker()
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_store_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
)

var raftLeaderFortificationFractionEnabled = settings.RegisterFloatSetting(
var RaftLeaderFortificationFractionEnabled = settings.RegisterFloatSetting(
settings.SystemOnly,
"kv.raft.leader_fortification.fraction_enabled",
"controls the fraction of ranges for which the raft leader fortification "+
Expand Down Expand Up @@ -87,7 +87,7 @@ func (r *replicaRLockedStoreLiveness) SupportFromEnabled() bool {
if !r.store.storeLiveness.SupportFromEnabled(context.TODO()) {
return false
}
fracEnabled := raftLeaderFortificationFractionEnabled.Get(&r.store.ClusterSettings().SV)
fracEnabled := RaftLeaderFortificationFractionEnabled.Get(&r.store.ClusterSettings().SV)
fortifyEnabled := raftFortificationEnabledForRangeID(fracEnabled, r.RangeID)
return fortifyEnabled
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4098,6 +4098,13 @@ func (s *Store) unregisterLeaseholderByID(ctx context.Context, rangeID roachpb.R
}
}

// TODO(arul): we probably need to construct the SupportManager a couple levels
// up, so that it's on the TestCluster or smth, and then plumb it down to the
// Store.
func (s *Store) StoreLivenessSupportManager() *storeliveness.SupportManager {
return s.storeLiveness.(*storeliveness.SupportManager)
}

// getRootMemoryMonitorForKV returns a BytesMonitor to use for KV memory
// tracking.
func (s *Store) getRootMemoryMonitorForKV() *mon.BytesMonitor {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/storeliveness/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"support_manager.go",
"supporter_state.go",
"transport.go",
"unreliable_store_liveness_handler.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness",
visibility = ["//visibility:public"],
Expand Down
36 changes: 36 additions & 0 deletions pkg/kv/kvserver/storeliveness/unreliable_store_liveness_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storeliveness

import "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness/storelivenesspb"

type UnreliableHandlerFuncs struct {
DropStoreLivenessMsg func(*storelivenesspb.Message) bool
}

// UnreliableHandler drops all StoreLiveness messages that are addressed to
// the specified storeID, but lets all other messages through.
type UnreliableHandler struct {
Name string
MessageHandler
UnreliableHandlerFuncs
}

var _ MessageHandler = &UnreliableHandler{}

// HandleMessage implements the MessageHandler interface.
func (h *UnreliableHandler) HandleMessage(msg *storelivenesspb.Message) error {
if h.DropStoreLivenessMsg(msg) {
return nil
}

return h.MessageHandler.HandleMessage(msg)
}
Loading

0 comments on commit e19016b

Please sign in to comment.