Skip to content

Commit

Permalink
Merge #63999
Browse files Browse the repository at this point in the history
63999: kvserver: add a concept of suspect nodes to the allocator r=lunevalex a=lunevalex

Touches #57093

A flaky node that goes in and out of liveness can cause
lease thrashing, as live nodes steal leases from the
failing node and then when it heartbeats the allocator
puts them back to maintain mean counts. This could be very
disruptive to the cluster as the leases ping-pong back and
forth. To prevent this problem we introduce a new concept
of a suspect node to the allocator. When a node fails to
hearbeat liveness it's considered suspect until it can reliably
heartbeat liveness for an extended period of time (60s by default).

Release note: None

Co-authored-by: Alex Lunev <[email protected]>
  • Loading branch information
craig[bot] and lunevalex committed Jun 2, 2021
2 parents 84499c9 + 04e7149 commit 0237afb
Show file tree
Hide file tree
Showing 12 changed files with 684 additions and 112 deletions.
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ func (a *Allocator) allocateTargetFromList(
constraintsChecker,
existingReplicaSet,
a.storePool.getLocalitiesByStore(existingReplicaSet),
a.storePool.isNodeReadyForRoutineReplicaTransfer,
a.storePool.isStoreReadyForRoutineReplicaTransfer,
allowMultipleReplsPerNode,
options,
)
Expand Down Expand Up @@ -1051,7 +1051,7 @@ func (a Allocator) rebalanceTarget(
replicaSetToRebalance,
replicasWithExcludedStores,
a.storePool.getLocalitiesByStore(replicaSetForDiversityCalc),
a.storePool.isNodeReadyForRoutineReplicaTransfer,
a.storePool.isStoreReadyForRoutineReplicaTransfer,
options,
)

Expand Down Expand Up @@ -1253,7 +1253,7 @@ func (a *Allocator) TransferLeaseTarget(
checkCandidateFullness bool,
alwaysAllowDecisionWithoutStats bool,
) roachpb.ReplicaDescriptor {
sl, _, _ := a.storePool.getStoreList(storeFilterNone)
sl, _, _ := a.storePool.getStoreList(storeFilterSuspect)
sl = sl.filter(zone.Constraints)
sl = sl.filter(zone.VoterConstraints)
// The only thing we use the storeList for is for the lease mean across the
Expand Down Expand Up @@ -1398,7 +1398,7 @@ func (a *Allocator) ShouldTransferLease(
}
}

sl, _, _ := a.storePool.getStoreList(storeFilterNone)
sl, _, _ := a.storePool.getStoreList(storeFilterSuspect)
sl = sl.filter(zone.Constraints)
sl = sl.filter(zone.VoterConstraints)
log.VEventf(ctx, 3, "ShouldTransferLease (lease-holder=%d):\n%s", leaseStoreID, sl)
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func rankedCandidateListForAllocation(
constraintsCheck constraintsCheckFn,
existingReplicas []roachpb.ReplicaDescriptor,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool,
isStoreValidForRoutineReplicaTransfer func(context.Context, roachpb.StoreID) bool,
allowMultipleReplsPerNode bool,
options scorerOptions,
) candidateList {
Expand All @@ -434,7 +434,7 @@ func rankedCandidateListForAllocation(
if !allowMultipleReplsPerNode && nodeHasReplica(s.Node.NodeID, existingReplTargets) {
continue
}
if !isNodeValidForRoutineReplicaTransfer(ctx, s.Node.NodeID) {
if !isStoreValidForRoutineReplicaTransfer(ctx, s.StoreID) {
log.VEventf(
ctx,
3,
Expand Down Expand Up @@ -561,7 +561,7 @@ func rankedCandidateListForRebalancing(
rebalanceConstraintsChecker rebalanceConstraintsCheckFn,
existingReplicasForType, replicasOnExemptedStores []roachpb.ReplicaDescriptor,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool,
isStoreValidForRoutineReplicaTransfer func(context.Context, roachpb.StoreID) bool,
options scorerOptions,
) []rebalanceOptions {
// 1. Determine whether existing replicas are valid and/or necessary.
Expand Down Expand Up @@ -643,7 +643,7 @@ func rankedCandidateListForRebalancing(
for _, store := range allStores.stores {
// Ignore any stores on dead nodes or stores that contain any of the
// replicas within `replicasOnExemptedStores`.
if !isNodeValidForRoutineReplicaTransfer(ctx, store.Node.NodeID) {
if !isStoreValidForRoutineReplicaTransfer(ctx, store.StoreID) {
log.VEventf(
ctx,
3,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/allocator_scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ func TestShouldRebalanceDiversity(t *testing.T) {
replicas,
nil,
existingStoreLocalities,
func(context.Context, roachpb.NodeID) bool { return true },
func(context.Context, roachpb.StoreID) bool { return true },
options,
)
actual := len(targets) > 0
Expand Down
158 changes: 147 additions & 11 deletions pkg/kv/kvserver/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ func mockStorePool(
deadStoreIDs []roachpb.StoreID,
decommissioningStoreIDs []roachpb.StoreID,
decommissionedStoreIDs []roachpb.StoreID,
suspectedStoreIDs []roachpb.StoreID,
) {
storePool.detailsMu.Lock()
defer storePool.detailsMu.Unlock()
Expand Down Expand Up @@ -448,6 +449,17 @@ func mockStorePool(
}
}

for _, storeID := range suspectedStoreIDs {
liveNodeSet[roachpb.NodeID(storeID)] = livenesspb.NodeLivenessStatus_LIVE
detail := storePool.getStoreDetailLocked(storeID)
detail.lastAvailable = storePool.clock.Now().GoTime()
detail.lastUnavailable = storePool.clock.Now().GoTime()
detail.desc = &roachpb.StoreDescriptor{
StoreID: storeID,
Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(storeID)},
}
}

// Set the node liveness function using the set we constructed.
storePool.nodeLivenessFn =
func(nodeID roachpb.NodeID, now time.Time, threshold time.Duration) livenesspb.NodeLivenessStatus {
Expand Down Expand Up @@ -1047,6 +1059,7 @@ func TestAllocatorRebalanceDeadNodes(t *testing.T) {
[]roachpb.StoreID{7, 8},
nil,
nil,
nil,
)

ranges := func(rangeCount int32) roachpb.StoreCapacity {
Expand Down Expand Up @@ -1844,6 +1857,56 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) {
}
}

func TestAllocatorShouldTransferSuspected(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
stopper, g, clock, storePool, nl := createTestStorePool(
TestTimeUntilStoreDeadOff, true, /* deterministic */
func() int { return 10 }, /* nodeCount */
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(storePool, func(string) (time.Duration, bool) {
return 0, true
})
defer stopper.Stop(context.Background())

var stores []*roachpb.StoreDescriptor
// Structure the capacity so we only get the desire to move when store 1 is around.
capacity := []int32{0, 20, 20}
for i := 1; i <= 3; i++ {
stores = append(stores, &roachpb.StoreDescriptor{
StoreID: roachpb.StoreID(i),
Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)},
Capacity: roachpb.StoreCapacity{LeaseCount: capacity[i-1]},
})
}
sg := gossiputil.NewStoreGossiper(g)
sg.GossipStores(stores, t)

assertShouldTransferLease := func(expected bool) {
t.Helper()
result := a.ShouldTransferLease(
context.Background(),
zonepb.EmptyCompleteZoneConfig(),
replicas(1, 2, 3),
2,
nil, /* replicaStats */
)
require.Equal(t, expected, result)
}
timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&storePool.st.SV)
// Based on capacity node 1 is desirable.
assertShouldTransferLease(true)
// Flip node 1 to unavailable, there should be no lease transfer now.
nl.setNodeStatus(1, livenesspb.NodeLivenessStatus_UNAVAILABLE)
assertShouldTransferLease(false)
// Set node back to live, but it's still suspected so not lease transfer expected.
nl.setNodeStatus(1, livenesspb.NodeLivenessStatus_LIVE)
assertShouldTransferLease(false)
// Wait out the suspected store timeout, verify that lease transfers are back.
clock.Increment(timeAfterStoreSuspect.Nanoseconds() + time.Millisecond.Nanoseconds())
assertShouldTransferLease(true)
}

func TestAllocatorLeasePreferences(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -2671,10 +2734,9 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) {
removalConstraintsChecker := voterConstraintsCheckerForRemoval(analyzed, constraint.EmptyAnalyzedConstraints)
rebalanceConstraintsChecker := voterConstraintsCheckerForRebalance(analyzed, constraint.EmptyAnalyzedConstraints)

a.storePool.isNodeReadyForRoutineReplicaTransfer = func(_ context.Context, n roachpb.NodeID) bool {
a.storePool.isStoreReadyForRoutineReplicaTransfer = func(_ context.Context, storeID roachpb.StoreID) bool {
for _, s := range tc.excluded {
// NodeID match StoreIDs here, so this comparison is valid.
if roachpb.NodeID(s) == n {
if s == storeID {
return false
}
}
Expand All @@ -2688,7 +2750,7 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) {
allocationConstraintsChecker,
existingRepls,
a.storePool.getLocalitiesByStore(existingRepls),
a.storePool.isNodeReadyForRoutineReplicaTransfer,
a.storePool.isStoreReadyForRoutineReplicaTransfer,
false, /* allowMultipleReplsPerNode */
a.scorerOptions(),
)
Expand All @@ -2708,7 +2770,7 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) {
existingRepls,
nil,
a.storePool.getLocalitiesByStore(existingRepls),
a.storePool.isNodeReadyForRoutineReplicaTransfer,
a.storePool.isStoreReadyForRoutineReplicaTransfer,
a.scorerOptions(),
)
if len(tc.expected) > 0 {
Expand Down Expand Up @@ -3032,7 +3094,7 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) {
checkFn,
existingRepls,
a.storePool.getLocalitiesByStore(existingRepls),
func(context.Context, roachpb.NodeID) bool { return true },
func(context.Context, roachpb.StoreID) bool { return true },
false, /* allowMultipleReplsPerNode */
a.scorerOptions(),
)
Expand Down Expand Up @@ -4274,7 +4336,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) {
existingRepls,
nil,
a.storePool.getLocalitiesByStore(existingRepls),
func(context.Context, roachpb.NodeID) bool { return true },
func(context.Context, roachpb.StoreID) bool { return true },
a.scorerOptions(),
)
match := true
Expand Down Expand Up @@ -5395,6 +5457,7 @@ func TestAllocatorComputeAction(t *testing.T) {
[]roachpb.StoreID{6, 7},
nil,
nil,
nil,
)

lastPriority := float64(999999999)
Expand Down Expand Up @@ -5497,7 +5560,80 @@ func TestAllocatorComputeActionRemoveDead(t *testing.T) {
defer stopper.Stop(ctx)

for i, tcase := range testCases {
mockStorePool(sp, tcase.live, nil, tcase.dead, nil, nil)
mockStorePool(sp, tcase.live, nil, tcase.dead, nil, nil, nil)
action, _ := a.ComputeAction(ctx, &zone, &tcase.desc)
if tcase.expectedAction != action {
t.Errorf("Test case %d expected action %d, got action %d", i, tcase.expectedAction, action)
}
}
}

func TestAllocatorComputeActionSuspect(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

zone := zonepb.ZoneConfig{
NumReplicas: proto.Int32(3),
}
threeReplDesc := roachpb.RangeDescriptor{
InternalReplicas: []roachpb.ReplicaDescriptor{
{
StoreID: 1,
NodeID: 1,
ReplicaID: 1,
},
{
StoreID: 2,
NodeID: 2,
ReplicaID: 2,
},
{
StoreID: 3,
NodeID: 3,
ReplicaID: 3,
},
},
}

testCases := []struct {
desc roachpb.RangeDescriptor
live []roachpb.StoreID
suspect []roachpb.StoreID
expectedAction AllocatorAction
}{
{
desc: threeReplDesc,
live: []roachpb.StoreID{1, 2, 3},
suspect: nil,
expectedAction: AllocatorConsiderRebalance,
},
{
desc: threeReplDesc,
live: []roachpb.StoreID{1, 2},
suspect: []roachpb.StoreID{3},
expectedAction: AllocatorConsiderRebalance,
},
{
desc: threeReplDesc,
live: []roachpb.StoreID{1, 2, 4},
suspect: []roachpb.StoreID{3},
expectedAction: AllocatorConsiderRebalance,
},
// Needs three replicas, two are suspect (i.e. the range lacks a quorum).
{
desc: threeReplDesc,
live: []roachpb.StoreID{1, 4},
suspect: []roachpb.StoreID{2, 3},
expectedAction: AllocatorRangeUnavailable,
},
}

stopper, _, sp, a, _ := createTestAllocator(10, false /* deterministic */)
ctx := context.Background()
defer stopper.Stop(ctx)

for i, tcase := range testCases {
mockStorePool(sp, tcase.live, nil, nil, nil, nil, tcase.suspect)
action, _ := a.ComputeAction(ctx, &zone, &tcase.desc)
if tcase.expectedAction != action {
t.Errorf("Test case %d expected action %d, got action %d", i, tcase.expectedAction, action)
Expand Down Expand Up @@ -5788,7 +5924,7 @@ func TestAllocatorComputeActionDecommission(t *testing.T) {
defer stopper.Stop(ctx)

for i, tcase := range testCases {
mockStorePool(sp, tcase.live, nil, tcase.dead, tcase.decommissioning, tcase.decommissioned)
mockStorePool(sp, tcase.live, nil, tcase.dead, tcase.decommissioning, tcase.decommissioned, nil)
action, _ := a.ComputeAction(ctx, &tcase.zone, &tcase.desc)
if tcase.expectedAction != action {
t.Errorf("Test case %d expected action %s, got action %s", i, tcase.expectedAction, action)
Expand Down Expand Up @@ -5827,7 +5963,7 @@ func TestAllocatorRemoveLearner(t *testing.T) {
ctx := context.Background()
defer stopper.Stop(ctx)
live, dead := []roachpb.StoreID{1, 2}, []roachpb.StoreID{3}
mockStorePool(sp, live, nil, dead, nil, nil)
mockStorePool(sp, live, nil, dead, nil, nil, nil)
action, _ := a.ComputeAction(ctx, &zone, &rangeWithLearnerDesc)
require.Equal(t, AllocatorRemoveLearner, action)
}
Expand Down Expand Up @@ -6028,7 +6164,7 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) {
t.Run(prefixKey.String(), func(t *testing.T) {
numNodes = len(c.storeList) - len(c.decommissioning)
mockStorePool(sp, c.live, c.unavailable, c.dead,
c.decommissioning, []roachpb.StoreID{})
c.decommissioning, nil, nil)
desc := makeDescriptor(c.storeList)
desc.EndKey = prefixKey

Expand Down
Loading

0 comments on commit 0237afb

Please sign in to comment.