From 04e7149928cd9e5e41a7c6f7c3ea8466677aeec0 Mon Sep 17 00:00:00 2001 From: Alex Lunev Date: Sun, 18 Apr 2021 22:15:01 -0700 Subject: [PATCH] kvserver: add a concept of suspect nodes to the allocator Touches #57093 A flaky node that goes in and out of liveness can cause lease thrashing, as live nodes steal leases from the failing node and then when it heartbeats the allocator puts them back to maintain mean counts. This could be very disruptive to the cluster as the leases ping-pong back and forth. To prevent this problem we introduce a new concept of a suspect node to the allocator. When a node fails to hearbeat liveness it's considered suspect until it can reliably heartbeat liveness for an extended period of time (60s by default). Release note: None --- pkg/kv/kvserver/allocator.go | 8 +- pkg/kv/kvserver/allocator_scorer.go | 8 +- pkg/kv/kvserver/allocator_scorer_test.go | 2 +- pkg/kv/kvserver/allocator_test.go | 158 ++++++++++++- pkg/kv/kvserver/client_lease_test.go | 215 ++++++++++++++++++ .../liveness/livenesspb/liveness.pb.go | 79 +++---- .../liveness/livenesspb/liveness.proto | 4 +- pkg/kv/kvserver/store_pool.go | 187 ++++++++++++--- pkg/kv/kvserver/store_pool_test.go | 124 +++++++++- pkg/kv/kvserver/store_rebalancer.go | 4 +- pkg/kv/kvserver/store_rebalancer_test.go | 5 +- .../containers/nodesOverview/index.tsx | 2 + 12 files changed, 684 insertions(+), 112 deletions(-) diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 93476c3390e6..54965812c0be 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -822,7 +822,7 @@ func (a *Allocator) allocateTargetFromList( constraintsChecker, existingReplicaSet, a.storePool.getLocalitiesByStore(existingReplicaSet), - a.storePool.isNodeReadyForRoutineReplicaTransfer, + a.storePool.isStoreReadyForRoutineReplicaTransfer, allowMultipleReplsPerNode, options, ) @@ -1051,7 +1051,7 @@ func (a Allocator) rebalanceTarget( replicaSetToRebalance, replicasWithExcludedStores, a.storePool.getLocalitiesByStore(replicaSetForDiversityCalc), - a.storePool.isNodeReadyForRoutineReplicaTransfer, + a.storePool.isStoreReadyForRoutineReplicaTransfer, options, ) @@ -1253,7 +1253,7 @@ func (a *Allocator) TransferLeaseTarget( checkCandidateFullness bool, alwaysAllowDecisionWithoutStats bool, ) roachpb.ReplicaDescriptor { - sl, _, _ := a.storePool.getStoreList(storeFilterNone) + sl, _, _ := a.storePool.getStoreList(storeFilterSuspect) sl = sl.filter(zone.Constraints) sl = sl.filter(zone.VoterConstraints) // The only thing we use the storeList for is for the lease mean across the @@ -1398,7 +1398,7 @@ func (a *Allocator) ShouldTransferLease( } } - sl, _, _ := a.storePool.getStoreList(storeFilterNone) + sl, _, _ := a.storePool.getStoreList(storeFilterSuspect) sl = sl.filter(zone.Constraints) sl = sl.filter(zone.VoterConstraints) log.VEventf(ctx, 3, "ShouldTransferLease (lease-holder=%d):\n%s", leaseStoreID, sl) diff --git a/pkg/kv/kvserver/allocator_scorer.go b/pkg/kv/kvserver/allocator_scorer.go index 79ec4515d5f7..2b9e1272b324 100644 --- a/pkg/kv/kvserver/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator_scorer.go @@ -418,7 +418,7 @@ func rankedCandidateListForAllocation( constraintsCheck constraintsCheckFn, existingReplicas []roachpb.ReplicaDescriptor, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, - isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool, + isStoreValidForRoutineReplicaTransfer func(context.Context, roachpb.StoreID) bool, allowMultipleReplsPerNode bool, options scorerOptions, ) candidateList { @@ -434,7 +434,7 @@ func rankedCandidateListForAllocation( if !allowMultipleReplsPerNode && nodeHasReplica(s.Node.NodeID, existingReplTargets) { continue } - if !isNodeValidForRoutineReplicaTransfer(ctx, s.Node.NodeID) { + if !isStoreValidForRoutineReplicaTransfer(ctx, s.StoreID) { log.VEventf( ctx, 3, @@ -561,7 +561,7 @@ func rankedCandidateListForRebalancing( rebalanceConstraintsChecker rebalanceConstraintsCheckFn, existingReplicasForType, replicasOnExemptedStores []roachpb.ReplicaDescriptor, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, - isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool, + isStoreValidForRoutineReplicaTransfer func(context.Context, roachpb.StoreID) bool, options scorerOptions, ) []rebalanceOptions { // 1. Determine whether existing replicas are valid and/or necessary. @@ -643,7 +643,7 @@ func rankedCandidateListForRebalancing( for _, store := range allStores.stores { // Ignore any stores on dead nodes or stores that contain any of the // replicas within `replicasOnExemptedStores`. - if !isNodeValidForRoutineReplicaTransfer(ctx, store.Node.NodeID) { + if !isStoreValidForRoutineReplicaTransfer(ctx, store.StoreID) { log.VEventf( ctx, 3, diff --git a/pkg/kv/kvserver/allocator_scorer_test.go b/pkg/kv/kvserver/allocator_scorer_test.go index d81d2e1e6d3f..fce8aae67c55 100644 --- a/pkg/kv/kvserver/allocator_scorer_test.go +++ b/pkg/kv/kvserver/allocator_scorer_test.go @@ -1225,7 +1225,7 @@ func TestShouldRebalanceDiversity(t *testing.T) { replicas, nil, existingStoreLocalities, - func(context.Context, roachpb.NodeID) bool { return true }, + func(context.Context, roachpb.StoreID) bool { return true }, options, ) actual := len(targets) > 0 diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index bc107d5cedb7..fe9bdd7ccc9a 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -401,6 +401,7 @@ func mockStorePool( deadStoreIDs []roachpb.StoreID, decommissioningStoreIDs []roachpb.StoreID, decommissionedStoreIDs []roachpb.StoreID, + suspectedStoreIDs []roachpb.StoreID, ) { storePool.detailsMu.Lock() defer storePool.detailsMu.Unlock() @@ -448,6 +449,17 @@ func mockStorePool( } } + for _, storeID := range suspectedStoreIDs { + liveNodeSet[roachpb.NodeID(storeID)] = livenesspb.NodeLivenessStatus_LIVE + detail := storePool.getStoreDetailLocked(storeID) + detail.lastAvailable = storePool.clock.Now().GoTime() + detail.lastUnavailable = storePool.clock.Now().GoTime() + detail.desc = &roachpb.StoreDescriptor{ + StoreID: storeID, + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(storeID)}, + } + } + // Set the node liveness function using the set we constructed. storePool.nodeLivenessFn = func(nodeID roachpb.NodeID, now time.Time, threshold time.Duration) livenesspb.NodeLivenessStatus { @@ -1047,6 +1059,7 @@ func TestAllocatorRebalanceDeadNodes(t *testing.T) { []roachpb.StoreID{7, 8}, nil, nil, + nil, ) ranges := func(rangeCount int32) roachpb.StoreCapacity { @@ -1844,6 +1857,56 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) { } } +func TestAllocatorShouldTransferSuspected(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + stopper, g, clock, storePool, nl := createTestStorePool( + TestTimeUntilStoreDeadOff, true, /* deterministic */ + func() int { return 10 }, /* nodeCount */ + livenesspb.NodeLivenessStatus_LIVE) + a := MakeAllocator(storePool, func(string) (time.Duration, bool) { + return 0, true + }) + defer stopper.Stop(context.Background()) + + var stores []*roachpb.StoreDescriptor + // Structure the capacity so we only get the desire to move when store 1 is around. + capacity := []int32{0, 20, 20} + for i := 1; i <= 3; i++ { + stores = append(stores, &roachpb.StoreDescriptor{ + StoreID: roachpb.StoreID(i), + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)}, + Capacity: roachpb.StoreCapacity{LeaseCount: capacity[i-1]}, + }) + } + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(stores, t) + + assertShouldTransferLease := func(expected bool) { + t.Helper() + result := a.ShouldTransferLease( + context.Background(), + zonepb.EmptyCompleteZoneConfig(), + replicas(1, 2, 3), + 2, + nil, /* replicaStats */ + ) + require.Equal(t, expected, result) + } + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&storePool.st.SV) + // Based on capacity node 1 is desirable. + assertShouldTransferLease(true) + // Flip node 1 to unavailable, there should be no lease transfer now. + nl.setNodeStatus(1, livenesspb.NodeLivenessStatus_UNAVAILABLE) + assertShouldTransferLease(false) + // Set node back to live, but it's still suspected so not lease transfer expected. + nl.setNodeStatus(1, livenesspb.NodeLivenessStatus_LIVE) + assertShouldTransferLease(false) + // Wait out the suspected store timeout, verify that lease transfers are back. + clock.Increment(timeAfterStoreSuspect.Nanoseconds() + time.Millisecond.Nanoseconds()) + assertShouldTransferLease(true) +} + func TestAllocatorLeasePreferences(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -2671,10 +2734,9 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { removalConstraintsChecker := voterConstraintsCheckerForRemoval(analyzed, constraint.EmptyAnalyzedConstraints) rebalanceConstraintsChecker := voterConstraintsCheckerForRebalance(analyzed, constraint.EmptyAnalyzedConstraints) - a.storePool.isNodeReadyForRoutineReplicaTransfer = func(_ context.Context, n roachpb.NodeID) bool { + a.storePool.isStoreReadyForRoutineReplicaTransfer = func(_ context.Context, storeID roachpb.StoreID) bool { for _, s := range tc.excluded { - // NodeID match StoreIDs here, so this comparison is valid. - if roachpb.NodeID(s) == n { + if s == storeID { return false } } @@ -2688,7 +2750,7 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { allocationConstraintsChecker, existingRepls, a.storePool.getLocalitiesByStore(existingRepls), - a.storePool.isNodeReadyForRoutineReplicaTransfer, + a.storePool.isStoreReadyForRoutineReplicaTransfer, false, /* allowMultipleReplsPerNode */ a.scorerOptions(), ) @@ -2708,7 +2770,7 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { existingRepls, nil, a.storePool.getLocalitiesByStore(existingRepls), - a.storePool.isNodeReadyForRoutineReplicaTransfer, + a.storePool.isStoreReadyForRoutineReplicaTransfer, a.scorerOptions(), ) if len(tc.expected) > 0 { @@ -3032,7 +3094,7 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) { checkFn, existingRepls, a.storePool.getLocalitiesByStore(existingRepls), - func(context.Context, roachpb.NodeID) bool { return true }, + func(context.Context, roachpb.StoreID) bool { return true }, false, /* allowMultipleReplsPerNode */ a.scorerOptions(), ) @@ -4274,7 +4336,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { existingRepls, nil, a.storePool.getLocalitiesByStore(existingRepls), - func(context.Context, roachpb.NodeID) bool { return true }, + func(context.Context, roachpb.StoreID) bool { return true }, a.scorerOptions(), ) match := true @@ -5395,6 +5457,7 @@ func TestAllocatorComputeAction(t *testing.T) { []roachpb.StoreID{6, 7}, nil, nil, + nil, ) lastPriority := float64(999999999) @@ -5497,7 +5560,80 @@ func TestAllocatorComputeActionRemoveDead(t *testing.T) { defer stopper.Stop(ctx) for i, tcase := range testCases { - mockStorePool(sp, tcase.live, nil, tcase.dead, nil, nil) + mockStorePool(sp, tcase.live, nil, tcase.dead, nil, nil, nil) + action, _ := a.ComputeAction(ctx, &zone, &tcase.desc) + if tcase.expectedAction != action { + t.Errorf("Test case %d expected action %d, got action %d", i, tcase.expectedAction, action) + } + } +} + +func TestAllocatorComputeActionSuspect(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + zone := zonepb.ZoneConfig{ + NumReplicas: proto.Int32(3), + } + threeReplDesc := roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + }, + { + StoreID: 3, + NodeID: 3, + ReplicaID: 3, + }, + }, + } + + testCases := []struct { + desc roachpb.RangeDescriptor + live []roachpb.StoreID + suspect []roachpb.StoreID + expectedAction AllocatorAction + }{ + { + desc: threeReplDesc, + live: []roachpb.StoreID{1, 2, 3}, + suspect: nil, + expectedAction: AllocatorConsiderRebalance, + }, + { + desc: threeReplDesc, + live: []roachpb.StoreID{1, 2}, + suspect: []roachpb.StoreID{3}, + expectedAction: AllocatorConsiderRebalance, + }, + { + desc: threeReplDesc, + live: []roachpb.StoreID{1, 2, 4}, + suspect: []roachpb.StoreID{3}, + expectedAction: AllocatorConsiderRebalance, + }, + // Needs three replicas, two are suspect (i.e. the range lacks a quorum). + { + desc: threeReplDesc, + live: []roachpb.StoreID{1, 4}, + suspect: []roachpb.StoreID{2, 3}, + expectedAction: AllocatorRangeUnavailable, + }, + } + + stopper, _, sp, a, _ := createTestAllocator(10, false /* deterministic */) + ctx := context.Background() + defer stopper.Stop(ctx) + + for i, tcase := range testCases { + mockStorePool(sp, tcase.live, nil, nil, nil, nil, tcase.suspect) action, _ := a.ComputeAction(ctx, &zone, &tcase.desc) if tcase.expectedAction != action { t.Errorf("Test case %d expected action %d, got action %d", i, tcase.expectedAction, action) @@ -5788,7 +5924,7 @@ func TestAllocatorComputeActionDecommission(t *testing.T) { defer stopper.Stop(ctx) for i, tcase := range testCases { - mockStorePool(sp, tcase.live, nil, tcase.dead, tcase.decommissioning, tcase.decommissioned) + mockStorePool(sp, tcase.live, nil, tcase.dead, tcase.decommissioning, tcase.decommissioned, nil) action, _ := a.ComputeAction(ctx, &tcase.zone, &tcase.desc) if tcase.expectedAction != action { t.Errorf("Test case %d expected action %s, got action %s", i, tcase.expectedAction, action) @@ -5827,7 +5963,7 @@ func TestAllocatorRemoveLearner(t *testing.T) { ctx := context.Background() defer stopper.Stop(ctx) live, dead := []roachpb.StoreID{1, 2}, []roachpb.StoreID{3} - mockStorePool(sp, live, nil, dead, nil, nil) + mockStorePool(sp, live, nil, dead, nil, nil, nil) action, _ := a.ComputeAction(ctx, &zone, &rangeWithLearnerDesc) require.Equal(t, AllocatorRemoveLearner, action) } @@ -6028,7 +6164,7 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { t.Run(prefixKey.String(), func(t *testing.T) { numNodes = len(c.storeList) - len(c.decommissioning) mockStorePool(sp, c.live, c.unavailable, c.dead, - c.decommissioning, []roachpb.StoreID{}) + c.decommissioning, nil, nil) desc := makeDescriptor(c.storeList) desc.EndKey = prefixKey diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index 79eab5465f8f..b48fd7909b4a 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -13,6 +13,7 @@ package kvserver_test import ( "context" "fmt" + "math" "runtime" "strconv" "strings" @@ -34,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -733,3 +735,216 @@ func TestLeasePreferencesDuringOutage(t *testing.T) { // make sure we see the eu node as a lease holder in the second to last position. require.Equal(t, tc.Target(0).NodeID, history[len(history)-2].Replica.NodeID) } + +// This test verifies that when a node starts flapping its liveness, all leases +// move off that node and it does not get any leases back until it heartbeats +// liveness and waits out the server.time_after_store_suspect interval. +func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // This is a hefty test, so we skip it under short and race. + skip.UnderShort(t) + skip.UnderRace(t) + + // We introduce constraints so that only n2,n3,n4 are considered when we + // determine if we should transfer leases based on capacity. This makes sure n2 + // looks desirable as a lease transfer target once it stops being suspect. + zcfg := zonepb.DefaultZoneConfig() + zcfg.Constraints = []zonepb.ConstraintsConjunction{ + { + NumReplicas: 3, + Constraints: []zonepb.Constraint{ + {Type: zonepb.Constraint_REQUIRED, Key: "region", Value: "us-west"}, + }, + }, + } + locality := func(region string) roachpb.Locality { + return roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "region", Value: region}, + }, + } + } + localities := []roachpb.Locality{ + locality("us-east"), + locality("us-west"), + locality("us-west"), + locality("us-west"), + } + // Speed up lease transfers. + stickyRegistry := server.NewStickyInMemEnginesRegistry() + defer stickyRegistry.CloseAllStickyInMemEngines() + ctx := context.Background() + manualClock := hlc.NewHybridManualClock() + serverArgs := make(map[int]base.TestServerArgs) + numNodes := 4 + for i := 0; i < numNodes; i++ { + serverArgs[i] = base.TestServerArgs{ + Locality: localities[i], + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + DefaultZoneConfigOverride: &zcfg, + StickyEngineRegistry: stickyRegistry, + }, + }, + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + }, + }, + } + } + tc := testcluster.StartTestCluster(t, numNodes, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: serverArgs, + }) + defer tc.Stopper().Stop(ctx) + + // We are not going to have stats, so disable this so we just rely on + // the store means. + _, err := tc.ServerConn(0).Exec(`SET CLUSTER SETTING kv.allocator.load_based_lease_rebalancing.enabled = 'false'`) + require.NoError(t, err) + + _, rhsDesc := tc.SplitRangeOrFatal(t, keys.UserTableDataMin) + tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Targets(1, 2, 3)...) + tc.RemoveLeaseHolderOrFatal(t, rhsDesc, tc.Target(0), tc.Target(1)) + + startKeys := make([]roachpb.Key, 20) + startKeys[0] = rhsDesc.StartKey.AsRawKey() + for i := 1; i < 20; i++ { + startKeys[i] = startKeys[i-1].Next() + tc.SplitRangeOrFatal(t, startKeys[i]) + require.NoError(t, tc.WaitForVoters(startKeys[i], tc.Targets(1, 2, 3)...)) + } + + leaseOnNonSuspectStores := func(key roachpb.Key) error { + var repl *kvserver.Replica + for _, i := range []int{2, 3} { + repl = tc.GetFirstStoreFromServer(t, i).LookupReplica(roachpb.RKey(key)) + if repl.OwnsValidLease(ctx, tc.Servers[i].Clock().NowAsClockTimestamp()) { + return nil + } + } + return errors.Errorf("Expected no lease on server 1 for %s", repl) + } + + allLeasesOnNonSuspectStores := func() error { + for _, key := range startKeys { + if err := leaseOnNonSuspectStores(key); err != nil { + return err + } + } + return nil + } + + // Make sure that all store pools have seen liveness heartbeats from everyone. + testutils.SucceedsSoon(t, func() error { + for i := range tc.Servers { + for j := range tc.Servers { + live, err := tc.GetFirstStoreFromServer(t, i).GetStoreConfig().StorePool.IsLive(tc.Target(j).StoreID) + if err != nil { + return err + } + if !live { + return errors.Errorf("Expected server %d to be suspect on server %d", j, i) + } + } + } + return nil + }) + + for _, key := range startKeys { + repl := tc.GetFirstStoreFromServer(t, 1).LookupReplica(roachpb.RKey(key)) + tc.TransferRangeLeaseOrFatal(t, *repl.Desc(), tc.Target(1)) + testutils.SucceedsSoon(t, func() error { + if !repl.OwnsValidLease(ctx, tc.Servers[1].Clock().NowAsClockTimestamp()) { + return errors.Errorf("Expected lease to transfer to server 1 for replica %s", repl) + } + return nil + }) + } + + heartbeat := func(servers ...int) { + for _, i := range servers { + testutils.SucceedsSoon(t, tc.Servers[i].HeartbeatNodeLiveness) + } + } + + // The node has to lose both it's raft leadership and liveness for leases to + // move, so the best way to simulate that right now is to just kill the node. + tc.StopServer(1) + // We move the time, so that server 1 can start failing its liveness. + livenessDuration, _ := tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().RaftConfig.NodeLivenessDurations() + + // We dont want the other stores to lose liveness while we move the time, so + // tick the time a second and a time and make sure they heartbeat. + for i := 0; i < int(math.Ceil(livenessDuration.Seconds())+1); i++ { + manualClock.Increment(time.Second.Nanoseconds()) + heartbeat(0, 2, 3) + } + + testutils.SucceedsSoon(t, func() error { + for _, i := range []int{2, 3} { + suspect, err := tc.GetFirstStoreFromServer(t, i).GetStoreConfig().StorePool.IsSuspect(tc.Target(1).StoreID) + if err != nil { + return err + } + if !suspect { + return errors.Errorf("Expected server 1 to be suspect on server %d", i) + } + } + return nil + }) + + runThroughTheReplicateQueue := func(key roachpb.Key) { + for _, i := range []int{2, 3} { + repl := tc.GetFirstStoreFromServer(t, i).LookupReplica(roachpb.RKey(key)) + require.NotNil(t, repl) + _, _, enqueueError := tc.GetFirstStoreFromServer(t, i). + ManuallyEnqueue(ctx, "replicate", repl, true) + require.NoError(t, enqueueError) + } + } + + for _, key := range startKeys { + testutils.SucceedsSoon(t, func() error { + runThroughTheReplicateQueue(key) + return leaseOnNonSuspectStores(key) + }) + } + require.NoError(t, tc.RestartServer(1)) + + for i := 0; i < int(math.Ceil(livenessDuration.Seconds())); i++ { + manualClock.Increment(time.Second.Nanoseconds()) + heartbeat(0, 2, 3) + } + // Force all the replication queues, server 1 is still suspect so it should not pick up any leases. + for _, key := range startKeys { + runThroughTheReplicateQueue(key) + } + testutils.SucceedsSoon(t, allLeasesOnNonSuspectStores) + // Wait out the suspect time. + suspectDuration := kvserver.TimeAfterStoreSuspect.Get(&tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().Settings.SV) + for i := 0; i < int(math.Ceil(suspectDuration.Seconds())); i++ { + manualClock.Increment(time.Second.Nanoseconds()) + heartbeat(0, 1, 2, 3) + } + + testutils.SucceedsSoon(t, func() error { + // Server 1 should get some leases back as it's no longer suspect. + for _, key := range startKeys { + runThroughTheReplicateQueue(key) + } + for _, key := range startKeys { + repl := tc.GetFirstStoreFromServer(t, 1).LookupReplica(roachpb.RKey(key)) + if repl.OwnsValidLease(ctx, tc.Servers[1].Clock().NowAsClockTimestamp()) { + return nil + } + } + return errors.Errorf("Expected server 1 to have at lease 1 lease.") + }) +} diff --git a/pkg/kv/kvserver/liveness/livenesspb/liveness.pb.go b/pkg/kv/kvserver/liveness/livenesspb/liveness.pb.go index 7c73a03d664c..69dbca8c5b18 100644 --- a/pkg/kv/kvserver/liveness/livenesspb/liveness.pb.go +++ b/pkg/kv/kvserver/liveness/livenesspb/liveness.pb.go @@ -110,8 +110,6 @@ const ( // UNAVAILABLE indicates that the node is unavailable - it has not updated its // liveness record recently enough to be considered live, but has not been // unavailable long enough to be considered dead. - // UNAVAILABLE is also reported for nodes whose descriptor is marked - // as draining. NodeLivenessStatus_UNAVAILABLE NodeLivenessStatus = 2 // LIVE indicates a live node. NodeLivenessStatus_LIVE NodeLivenessStatus = 3 @@ -120,6 +118,8 @@ const ( // DECOMMISSIONED indicates a node that has finished the decommissioning // process. NodeLivenessStatus_DECOMMISSIONED NodeLivenessStatus = 5 + // DRAINING indicates a node that is in the process of draining. + NodeLivenessStatus_DRAINING NodeLivenessStatus = 6 ) var NodeLivenessStatus_name = map[int32]string{ @@ -129,6 +129,7 @@ var NodeLivenessStatus_name = map[int32]string{ 3: "NODE_STATUS_LIVE", 4: "NODE_STATUS_DECOMMISSIONING", 5: "NODE_STATUS_DECOMMISSIONED", + 6: "NODE_STATUS_DRAINING", } var NodeLivenessStatus_value = map[string]int32{ @@ -138,6 +139,7 @@ var NodeLivenessStatus_value = map[string]int32{ "NODE_STATUS_LIVE": 3, "NODE_STATUS_DECOMMISSIONING": 4, "NODE_STATUS_DECOMMISSIONED": 5, + "NODE_STATUS_DRAINING": 6, } func (x NodeLivenessStatus) String() string { @@ -223,42 +225,43 @@ func init() { } var fileDescriptor_d1574f4e989c4767 = []byte{ - // 554 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0x4b, 0x8b, 0xd3, 0x50, - 0x14, 0xc7, 0x73, 0xfb, 0x9a, 0x72, 0x2b, 0x33, 0xe1, 0x76, 0xc0, 0x12, 0x21, 0x09, 0xea, 0xa2, - 0x0e, 0xc3, 0x0d, 0x74, 0x5c, 0xe9, 0x2a, 0x9d, 0x04, 0x09, 0xb6, 0x29, 0xb4, 0x9d, 0x11, 0xc6, - 0x45, 0xc9, 0xe3, 0x92, 0x84, 0x3e, 0x6e, 0x48, 0xd2, 0xa2, 0x5f, 0xa1, 0x2b, 0x71, 0xe5, 0xa6, - 0xd0, 0x85, 0x0b, 0x3f, 0x82, 0x1f, 0xa1, 0x2b, 0x99, 0xe5, 0xac, 0x8a, 0xb6, 0x1b, 0x3f, 0x83, - 0x2b, 0x49, 0x1f, 0xe9, 0x03, 0x04, 0x77, 0xff, 0x73, 0x38, 0xff, 0x5f, 0xf2, 0x3f, 0xe7, 0x42, - 0xdc, 0x1d, 0x49, 0xdd, 0x51, 0x48, 0x82, 0x11, 0x09, 0xa4, 0x9e, 0x37, 0x22, 0x03, 0x12, 0x86, - 0x89, 0xf0, 0xcd, 0x44, 0x62, 0x3f, 0xa0, 0x11, 0x45, 0x2f, 0x2c, 0x6a, 0x75, 0x03, 0x6a, 0x58, - 0x2e, 0xee, 0x8e, 0xf0, 0xd6, 0x89, 0x93, 0xa9, 0x9d, 0x93, 0x13, 0x86, 0x91, 0xd7, 0x93, 0xdc, - 0x9e, 0x25, 0xf5, 0x88, 0x63, 0x58, 0x1f, 0x3b, 0x91, 0xd7, 0x27, 0x61, 0x64, 0xf4, 0xfd, 0x35, - 0x8b, 0x3b, 0x77, 0xa8, 0x43, 0x57, 0x52, 0x8a, 0xd5, 0xba, 0xfb, 0xf4, 0x47, 0x0a, 0xe6, 0x6b, - 0x1b, 0x0a, 0xba, 0x83, 0x27, 0x03, 0x6a, 0x93, 0x8e, 0x67, 0x97, 0x80, 0x08, 0xca, 0xd9, 0xaa, - 0xbc, 0x98, 0x0b, 0x39, 0x9d, 0xda, 0x44, 0x53, 0xfe, 0xcc, 0x85, 0x2b, 0xc7, 0x8b, 0xdc, 0xa1, - 0x89, 0x2d, 0xda, 0x97, 0x92, 0x1f, 0xb3, 0xcd, 0x9d, 0x96, 0xfc, 0xae, 0x23, 0xad, 0x94, 0x6f, - 0xe2, 0xb5, 0xad, 0x99, 0x8b, 0x89, 0x9a, 0x8d, 0xce, 0x61, 0x96, 0xf8, 0xd4, 0x72, 0x4b, 0x29, - 0x11, 0x94, 0xd3, 0xcd, 0x75, 0x81, 0x34, 0x08, 0xc9, 0x07, 0xdf, 0x0b, 0x8c, 0xc8, 0xa3, 0x83, - 0x52, 0x5a, 0x04, 0xe5, 0x42, 0xe5, 0x19, 0xde, 0xa5, 0x8e, 0x43, 0x61, 0xb7, 0x67, 0xe1, 0xda, - 0x2a, 0x54, 0x7b, 0x9b, 0xa9, 0x9a, 0x99, 0xcd, 0x05, 0xa6, 0xb9, 0x67, 0x46, 0x1c, 0xcc, 0xdb, - 0x81, 0xe1, 0x0d, 0xbc, 0x81, 0x53, 0xca, 0x88, 0xa0, 0x9c, 0x6f, 0x26, 0x35, 0x7a, 0x0f, 0x61, - 0x9f, 0xf4, 0x4d, 0x12, 0x84, 0xae, 0xe7, 0x97, 0xb2, 0x22, 0x28, 0x9f, 0x56, 0x5e, 0xe3, 0xff, - 0x5e, 0x2e, 0xae, 0x27, 0xe6, 0x56, 0x64, 0x44, 0xc3, 0xb0, 0xb9, 0x87, 0x7b, 0xf5, 0xe8, 0xcb, - 0x54, 0x60, 0xbe, 0x4f, 0x05, 0xf0, 0x7b, 0x2a, 0x80, 0x8b, 0x3a, 0x64, 0x8f, 0xa7, 0x11, 0x84, - 0x39, 0xf9, 0xba, 0xad, 0xdd, 0xaa, 0x2c, 0x83, 0x8a, 0xf0, 0x4c, 0x51, 0xaf, 0x1b, 0xf5, 0xba, - 0xd6, 0x6a, 0x69, 0x0d, 0x5d, 0xd3, 0xdf, 0xb0, 0x00, 0x21, 0x78, 0xba, 0xdf, 0x54, 0x15, 0x36, - 0xc5, 0x65, 0xbe, 0x7d, 0xe5, 0x99, 0x8b, 0xcf, 0x29, 0x88, 0xe2, 0x4d, 0x6e, 0x6f, 0xb4, 0x21, - 0x3e, 0x87, 0x45, 0xbd, 0xa1, 0xa8, 0x9d, 0x56, 0x5b, 0x6e, 0xdf, 0xb4, 0x3a, 0x37, 0xfa, 0x5b, - 0xbd, 0xf1, 0x4e, 0x67, 0x19, 0xae, 0x30, 0x9e, 0x88, 0x27, 0x9b, 0x12, 0xf1, 0x90, 0xdd, 0x9f, - 0x52, 0x54, 0x59, 0x61, 0x01, 0x97, 0x1f, 0x4f, 0xc4, 0x4c, 0xac, 0xd1, 0x25, 0x7c, 0x7c, 0x48, - 0x91, 0x6f, 0x65, 0xad, 0x26, 0x57, 0x6b, 0x2a, 0x9b, 0xe2, 0xce, 0xc6, 0x13, 0xb1, 0xb0, 0xd7, - 0x3a, 0xa6, 0xd5, 0xe2, 0x3c, 0xe9, 0x35, 0x2d, 0xd6, 0xe8, 0x25, 0x7c, 0x72, 0xf8, 0xb5, 0xc3, - 0x94, 0x19, 0xae, 0x38, 0x9e, 0x88, 0xc7, 0xe1, 0x51, 0x05, 0x72, 0xff, 0x72, 0xa9, 0x0a, 0x9b, - 0xe5, 0xd0, 0x78, 0x22, 0x1e, 0x2d, 0xa7, 0x7a, 0x39, 0xfb, 0xc5, 0x33, 0xb3, 0x05, 0x0f, 0xee, - 0x17, 0x3c, 0x78, 0x58, 0xf0, 0xe0, 0xe7, 0x82, 0x07, 0x9f, 0x96, 0x3c, 0x73, 0xbf, 0xe4, 0x99, - 0x87, 0x25, 0xcf, 0xdc, 0xc1, 0xdd, 0xf1, 0xcc, 0xdc, 0xea, 0xa5, 0x5f, 0xfd, 0x0d, 0x00, 0x00, - 0xff, 0xff, 0x04, 0x9b, 0xb9, 0xac, 0x7d, 0x03, 0x00, 0x00, + // 573 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0xbd, 0x6a, 0xdb, 0x50, + 0x14, 0xc7, 0x75, 0x1d, 0xdb, 0x31, 0x37, 0x21, 0x11, 0x37, 0x81, 0x1a, 0x15, 0x24, 0xd1, 0x96, + 0xe2, 0x86, 0x70, 0x05, 0x4e, 0xa7, 0x76, 0x92, 0x23, 0x51, 0x44, 0x6d, 0x19, 0x64, 0x27, 0x85, + 0x74, 0x30, 0xfa, 0xb8, 0x48, 0xc2, 0x1f, 0x12, 0x92, 0x6c, 0xda, 0x57, 0xd0, 0xd4, 0xb1, 0x8b, + 0xc0, 0x43, 0x87, 0x3e, 0x42, 0xe9, 0x13, 0x78, 0x2a, 0x19, 0x33, 0x99, 0xd6, 0x5e, 0xfa, 0x0c, + 0x9d, 0x8a, 0xfc, 0x21, 0x7f, 0x40, 0xa1, 0xdb, 0xff, 0x1c, 0xce, 0xff, 0x77, 0xf5, 0x3f, 0x07, + 0x41, 0xdc, 0x1d, 0x09, 0xdd, 0x51, 0x48, 0x82, 0x11, 0x09, 0x84, 0x9e, 0x3b, 0x22, 0x03, 0x12, + 0x86, 0x99, 0xf0, 0x8d, 0x4c, 0x62, 0x3f, 0xf0, 0x22, 0x0f, 0xbd, 0x30, 0x3d, 0xb3, 0x1b, 0x78, + 0xba, 0xe9, 0xe0, 0xee, 0x08, 0xaf, 0x9d, 0x38, 0x9b, 0xda, 0x38, 0x19, 0x6e, 0x18, 0xb9, 0x3d, + 0xc1, 0xe9, 0x99, 0x42, 0x8f, 0xd8, 0xba, 0xf9, 0xb1, 0x13, 0xb9, 0x7d, 0x12, 0x46, 0x7a, 0xdf, + 0x5f, 0xb2, 0x98, 0x73, 0xdb, 0xb3, 0xbd, 0x85, 0x14, 0x52, 0xb5, 0xec, 0x3e, 0xf9, 0x91, 0x83, + 0xa5, 0xfa, 0x8a, 0x82, 0xee, 0xe0, 0xe1, 0xc0, 0xb3, 0x48, 0xc7, 0xb5, 0xca, 0x80, 0x07, 0x95, + 0x42, 0x4d, 0x9c, 0x4d, 0xb9, 0xa2, 0xea, 0x59, 0x44, 0x91, 0xfe, 0x4c, 0xb9, 0x2b, 0xdb, 0x8d, + 0x9c, 0xa1, 0x81, 0x4d, 0xaf, 0x2f, 0x64, 0x1f, 0x66, 0x19, 0x1b, 0x2d, 0xf8, 0x5d, 0x5b, 0x58, + 0x28, 0xdf, 0xc0, 0x4b, 0x9b, 0x56, 0x4c, 0x89, 0x8a, 0x85, 0xce, 0x61, 0x81, 0xf8, 0x9e, 0xe9, + 0x94, 0x73, 0x3c, 0xa8, 0x1c, 0x68, 0xcb, 0x02, 0x29, 0x10, 0x92, 0x0f, 0xbe, 0x1b, 0xe8, 0x91, + 0xeb, 0x0d, 0xca, 0x07, 0x3c, 0xa8, 0x1c, 0x55, 0x9f, 0xe2, 0x4d, 0xea, 0x34, 0x14, 0x76, 0x7a, + 0x26, 0xae, 0x2f, 0x42, 0xb5, 0xd7, 0x99, 0x6a, 0xf9, 0xc9, 0x94, 0xa3, 0xb4, 0x2d, 0x33, 0x62, + 0x60, 0xc9, 0x0a, 0x74, 0x77, 0xe0, 0x0e, 0xec, 0x72, 0x9e, 0x07, 0x95, 0x92, 0x96, 0xd5, 0xe8, + 0x3d, 0x84, 0x7d, 0xd2, 0x37, 0x48, 0x10, 0x3a, 0xae, 0x5f, 0x2e, 0xf0, 0xa0, 0x72, 0x52, 0x7d, + 0x8d, 0xff, 0x7b, 0xb9, 0xb8, 0x91, 0x99, 0x5b, 0x91, 0x1e, 0x0d, 0x43, 0x6d, 0x0b, 0xf7, 0xea, + 0xf8, 0xf3, 0x98, 0xa3, 0xbe, 0x8d, 0x39, 0xf0, 0x7b, 0xcc, 0x81, 0x8b, 0x06, 0xa4, 0xf7, 0xa7, + 0x11, 0x84, 0x45, 0xf1, 0xba, 0xad, 0xdc, 0xca, 0x34, 0x85, 0xce, 0xe0, 0xa9, 0x24, 0x5f, 0x37, + 0x1b, 0x0d, 0xa5, 0xd5, 0x52, 0x9a, 0xaa, 0xa2, 0xbe, 0xa1, 0x01, 0x42, 0xf0, 0x64, 0xbb, 0x29, + 0x4b, 0x74, 0x8e, 0xc9, 0x7f, 0xfd, 0xc2, 0x52, 0x17, 0xdf, 0x73, 0x10, 0xa5, 0x9b, 0x5c, 0xdf, + 0x68, 0x45, 0x7c, 0x06, 0xcf, 0xd4, 0xa6, 0x24, 0x77, 0x5a, 0x6d, 0xb1, 0x7d, 0xd3, 0xea, 0xdc, + 0xa8, 0x6f, 0xd5, 0xe6, 0x3b, 0x95, 0xa6, 0x98, 0xa3, 0x38, 0xe1, 0x0f, 0x57, 0x25, 0x62, 0x21, + 0xbd, 0x3d, 0x25, 0xc9, 0xa2, 0x44, 0x03, 0xa6, 0x14, 0x27, 0x7c, 0x3e, 0xd5, 0xe8, 0x12, 0x3e, + 0xda, 0xa5, 0x88, 0xb7, 0xa2, 0x52, 0x17, 0x6b, 0x75, 0x99, 0xce, 0x31, 0xa7, 0x71, 0xc2, 0x1f, + 0x6d, 0xb5, 0xf6, 0x69, 0xf5, 0x34, 0xcf, 0xc1, 0x92, 0x96, 0x6a, 0xf4, 0x12, 0x3e, 0xde, 0x7d, + 0x6d, 0x37, 0x65, 0x9e, 0x39, 0x8b, 0x13, 0x7e, 0x3f, 0x3c, 0xaa, 0x42, 0xe6, 0x5f, 0x2e, 0x59, + 0xa2, 0x0b, 0x0c, 0x8a, 0x13, 0x7e, 0x6f, 0x39, 0xe8, 0x39, 0x3c, 0xdf, 0xf1, 0x68, 0xa2, 0xb2, + 0x78, 0xa2, 0xc8, 0x1c, 0xc7, 0x09, 0x5f, 0x5a, 0xd7, 0xb5, 0xcb, 0xc9, 0x2f, 0x96, 0x9a, 0xcc, + 0x58, 0x70, 0x3f, 0x63, 0xc1, 0xc3, 0x8c, 0x05, 0x3f, 0x67, 0x2c, 0xf8, 0x34, 0x67, 0xa9, 0xfb, + 0x39, 0x4b, 0x3d, 0xcc, 0x59, 0xea, 0x0e, 0x6e, 0x8e, 0x6c, 0x14, 0x17, 0x7f, 0xc4, 0xd5, 0xdf, + 0x00, 0x00, 0x00, 0xff, 0xff, 0x91, 0x2b, 0x1b, 0xb5, 0xa5, 0x03, 0x00, 0x00, } func (this *Liveness) Equal(that interface{}) bool { diff --git a/pkg/kv/kvserver/liveness/livenesspb/liveness.proto b/pkg/kv/kvserver/liveness/livenesspb/liveness.proto index d5b81bc696a8..ae74c9a5b9d1 100644 --- a/pkg/kv/kvserver/liveness/livenesspb/liveness.proto +++ b/pkg/kv/kvserver/liveness/livenesspb/liveness.proto @@ -125,8 +125,6 @@ enum NodeLivenessStatus { // UNAVAILABLE indicates that the node is unavailable - it has not updated its // liveness record recently enough to be considered live, but has not been // unavailable long enough to be considered dead. - // UNAVAILABLE is also reported for nodes whose descriptor is marked - // as draining. NODE_STATUS_UNAVAILABLE = 2 [(gogoproto.enumvalue_customname) = "UNAVAILABLE"]; // LIVE indicates a live node. NODE_STATUS_LIVE = 3 [(gogoproto.enumvalue_customname) = "LIVE"]; @@ -135,4 +133,6 @@ enum NodeLivenessStatus { // DECOMMISSIONED indicates a node that has finished the decommissioning // process. NODE_STATUS_DECOMMISSIONED = 5 [(gogoproto.enumvalue_customname) = "DECOMMISSIONED"]; + // DRAINING indicates a node that is in the process of draining. + NODE_STATUS_DRAINING = 6 [(gogoproto.enumvalue_customname) = "DRAINING"]; } diff --git a/pkg/kv/kvserver/store_pool.go b/pkg/kv/kvserver/store_pool.go index 73f88915f2e4..96fc4f05e257 100644 --- a/pkg/kv/kvserver/store_pool.go +++ b/pkg/kv/kvserver/store_pool.go @@ -62,6 +62,29 @@ var FailedReservationsTimeout = settings.RegisterDurationSetting( settings.NonNegativeDuration, ) +const timeAfterStoreSuspectSettingName = "server.time_after_store_suspect" + +// TimeAfterStoreSuspect measures how long we consider a store suspect since +// it's last failure. +var TimeAfterStoreSuspect = settings.RegisterDurationSetting( + timeAfterStoreSuspectSettingName, + "the amount of time we consider a store suspect for after it fails a node liveness heartbeat."+ + " A suspect node would not receive any new replicas or lease transfers, but will keep the replicas it has.", + 30*time.Second, + settings.NonNegativeDuration, + func(v time.Duration) error { + // We enforce a maximum value of 5 minutes for this settings, as setting this + // to high may result in a prolonged period of unavailability as a recovered + // store will not be able to acquire leases or replicas for a long time. + const maxTimeAfterStoreSuspect = 5 * time.Minute + if v > maxTimeAfterStoreSuspect { + return errors.Errorf("cannot set %s to more than %v: %v", + timeAfterStoreSuspectSettingName, maxTimeAfterStoreSuspect, v) + } + return nil + }, +) + const timeUntilStoreDeadSettingName = "server.time_until_store_dead" // TimeUntilStoreDead wraps "server.time_until_store_dead". @@ -108,7 +131,7 @@ func MakeStorePoolNodeLivenessFunc(nodeLiveness *liveness.NodeLiveness) NodeLive ) livenesspb.NodeLivenessStatus { liveness, ok := nodeLiveness.GetLiveness(nodeID) if !ok { - return livenesspb.NodeLivenessStatus_UNAVAILABLE + return livenesspb.NodeLivenessStatus_UNKNOWN } return LivenessStatus(liveness.Liveness, now, timeUntilStoreDead) } @@ -130,9 +153,9 @@ func MakeStorePoolNodeLivenessFunc(nodeLiveness *liveness.NodeLiveness) NodeLive // // - Let's say a node write its liveness record at tWrite. It sets the // Expiration field of the record as tExp=tWrite+livenessThreshold. -// The node is considered LIVE (or DECOMMISSIONING or UNAVAILABLE if draining). +// The node is considered LIVE (or DECOMMISSIONING or DRAINING). // - At tExp, the IsLive() method starts returning false. The state becomes -// UNAVAILABLE (or stays DECOMMISSIONING or UNAVAILABLE if draining). +// UNAVAILABLE (or stays DECOMMISSIONING or DRAINING). // - Once threshold passes, the node is considered DEAD (or DECOMMISSIONED). // // NB: There's a bit of discrepancy between what "Decommissioned" represents, as @@ -155,7 +178,7 @@ func LivenessStatus( return livenesspb.NodeLivenessStatus_DECOMMISSIONING } if l.Draining { - return livenesspb.NodeLivenessStatus_UNAVAILABLE + return livenesspb.NodeLivenessStatus_DRAINING } if l.IsLive(now) { return livenesspb.NodeLivenessStatus_LIVE @@ -174,6 +197,12 @@ type storeDetail struct { // lastUpdatedTime is set when a store is first consulted and every time // gossip arrives for a store. lastUpdatedTime time.Time + // lastUnavailable is set when it's detected that a store was unavailable, + // i.e. failed liveness. + lastUnavailable time.Time + // lastAvailable is set when it's detected that a store was available, + // i.e. we got a liveness heartbeat. + lastAvailable time.Time } // isThrottled returns whether the store is currently throttled. @@ -181,6 +210,13 @@ func (sd storeDetail) isThrottled(now time.Time) bool { return sd.throttledUntil.After(now) } +// isSuspect returns whether the store is currently suspect. We measure that by +// looking at the time it was last unavailable making sure we have not seen any +// failures for a period of time defined by StoreSuspectDuration. +func (sd storeDetail) isSuspect(now time.Time, suspectDuration time.Duration) bool { + return sd.lastUnavailable.Add(suspectDuration).After(now) +} + // storeStatus is the current status of a store. type storeStatus int @@ -201,10 +237,12 @@ const ( storeStatusAvailable // The store is decommissioning. storeStatusDecommissioning + // The store failed it's liveness heartbeat recently and is considered suspect. + storeStatusSuspect ) func (sd *storeDetail) status( - now time.Time, threshold time.Duration, nl NodeLivenessFunc, + now time.Time, threshold time.Duration, nl NodeLivenessFunc, suspectDuration time.Duration, ) storeStatus { // The store is considered dead if it hasn't been updated via gossip // within the liveness threshold. Note that lastUpdatedTime is set @@ -212,6 +250,9 @@ func (sd *storeDetail) status( // even before the first gossip arrives for a store. deadAsOf := sd.lastUpdatedTime.Add(threshold) if now.After(deadAsOf) { + // Wipe out the lastAvailable timestamp, so that once a node comes back + // from the dead we dont consider it suspect. + sd.lastAvailable = time.Time{} return storeStatusDead } // If there's no descriptor (meaning no gossip ever arrived for this @@ -227,7 +268,23 @@ func (sd *storeDetail) status( return storeStatusDead case livenesspb.NodeLivenessStatus_DECOMMISSIONING: return storeStatusDecommissioning - case livenesspb.NodeLivenessStatus_UNKNOWN, livenesspb.NodeLivenessStatus_UNAVAILABLE: + case livenesspb.NodeLivenessStatus_UNAVAILABLE: + // We don't want to suspect a node on startup or when it's first added to a + // cluster, because we dont know it's liveness yet. A node is only considered + // suspect if it's been alive and fails to heartbeat liveness. + if !sd.lastAvailable.IsZero() { + sd.lastUnavailable = now + return storeStatusSuspect + } + return storeStatusUnknown + case livenesspb.NodeLivenessStatus_UNKNOWN: + return storeStatusUnknown + case livenesspb.NodeLivenessStatus_DRAINING: + // Wipe out the lastAvailable timestamp, so if this node comes back after a + // graceful restart it will not be considered as suspect. This is best effort + // and we may not see a store in this state. To help with that we perform + // a similar clear of lastAvailable on a DEAD store. + sd.lastAvailable = time.Time{} return storeStatusUnknown } @@ -235,6 +292,10 @@ func (sd *storeDetail) status( return storeStatusThrottled } + if sd.isSuspect(now, suspectDuration) { + return storeStatusSuspect + } + sd.lastAvailable = now return storeStatusAvailable } @@ -273,11 +334,11 @@ type StorePool struct { nodeLocalities map[roachpb.NodeID]localityWithString } - // isNodeReadyForRoutineReplicaTransferInternal returns true iff the - // node is live and thus a good candidate to receive a replica. + // isStoreReadyForRoutineReplicaTransfer returns true if the + // store is live and thus a good candidate to receive a replica. // This is defined as a closure reference here instead // of a regular method so it can be overridden in tests. - isNodeReadyForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool + isStoreReadyForRoutineReplicaTransfer func(context.Context, roachpb.StoreID) bool } // NewStorePool creates a StorePool and registers the store updating callback @@ -301,7 +362,7 @@ func NewStorePool( startTime: clock.PhysicalTime(), deterministic: deterministic, } - sp.isNodeReadyForRoutineReplicaTransfer = sp.isNodeReadyForRoutineReplicaTransferInternal + sp.isStoreReadyForRoutineReplicaTransfer = sp.isStoreReadyForRoutineReplicaTransferInternal sp.detailsMu.storeDetails = make(map[roachpb.StoreID]*storeDetail) sp.localitiesMu.nodeLocalities = make(map[roachpb.NodeID]localityWithString) @@ -327,11 +388,12 @@ func (sp *StorePool) String() string { var buf bytes.Buffer now := sp.clock.Now().GoTime() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) for _, id := range ids { detail := sp.detailsMu.storeDetails[id] fmt.Fprintf(&buf, "%d", id) - status := detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn) + status := detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect) if status != storeStatusAvailable { fmt.Fprintf(&buf, " (status=%d)", status) } @@ -496,10 +558,11 @@ func (sp *StorePool) decommissioningReplicas( // take clock signals from remote nodes into consideration. now := sp.clock.Now().GoTime() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) for _, repl := range repls { detail := sp.getStoreDetailLocked(repl.StoreID) - switch detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn) { + switch detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect) { case storeStatusDecommissioning: decommissioningReplicas = append(decommissioningReplicas, repl) } @@ -542,6 +605,42 @@ func (sp *StorePool) IsDead(storeID roachpb.StoreID) (bool, time.Duration, error return false, deadAsOf.Sub(now), nil } +// IsSuspect returns true if the node is suspected by the store pool or an error +// if the store is not found in the pool. +func (sp *StorePool) IsSuspect(storeID roachpb.StoreID) (bool, error) { + status, err := sp.storeStatus(storeID) + if err != nil { + return false, err + } + return status == storeStatusSuspect, nil +} + +// IsLive returns true if the node is considered alive by the store pool or an error +// if the store is not found in the pool. +func (sp *StorePool) IsLive(storeID roachpb.StoreID) (bool, error) { + status, err := sp.storeStatus(storeID) + if err != nil { + return false, err + } + return status == storeStatusAvailable, nil +} + +func (sp *StorePool) storeStatus(storeID roachpb.StoreID) (storeStatus, error) { + sp.detailsMu.Lock() + defer sp.detailsMu.Unlock() + + sd, ok := sp.detailsMu.storeDetails[storeID] + if !ok { + return storeStatusUnknown, errors.Errorf("store %d was not found", storeID) + } + // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() is order to + // take clock signals from remote nodes into consideration. + now := sp.clock.Now().GoTime() + timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) + return sd.status(now, timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect), nil +} + // liveAndDeadReplicas divides the provided repls slice into two slices: the // first for live replicas, and the second for dead replicas. // Replicas for which liveness or deadness cannot be ascertained are excluded @@ -555,11 +654,12 @@ func (sp *StorePool) liveAndDeadReplicas( now := sp.clock.Now().GoTime() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) for _, repl := range repls { detail := sp.getStoreDetailLocked(repl.StoreID) // Mark replica as dead if store is dead. - status := detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn) + status := detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect) switch status { case storeStatusDead: deadReplicas = append(deadReplicas, repl) @@ -569,7 +669,7 @@ func (sp *StorePool) liveAndDeadReplicas( // We count decommissioning replicas to be alive because they are readable // and should be used for up-replication if necessary. liveReplicas = append(liveReplicas, repl) - case storeStatusUnknown: + case storeStatusUnknown, storeStatusSuspect: // No-op. default: log.Fatalf(context.TODO(), "unknown store status %d", status) @@ -682,6 +782,12 @@ const ( // for replica rebalancing, for example, but can still be considered for lease // rebalancing. storeFilterThrottled + // storeFilterSuspect requests that the returned store list additionally + // exclude stores that have been suspected as unhealthy. We dont want unhealthy + // stores to be considered for rebalancing or for lease transfers. i.e. we dont + // actively shift leases or replicas away from them, but we dont allow them to + // get any new ones until they get better. + storeFilterSuspect ) type throttledStoreReasons []string @@ -691,14 +797,14 @@ type throttledStoreReasons []string // according to the provided storeFilter. It also returns the total number of // alive and throttled stores. func (sp *StorePool) getStoreList(filter storeFilter) (StoreList, int, throttledStoreReasons) { - sp.detailsMu.RLock() - defer sp.detailsMu.RUnlock() + sp.detailsMu.Lock() + defer sp.detailsMu.Unlock() var storeIDs roachpb.StoreIDSlice for storeID := range sp.detailsMu.storeDetails { storeIDs = append(storeIDs, storeID) } - return sp.getStoreListFromIDsRLocked(storeIDs, filter) + return sp.getStoreListFromIDsLocked(storeIDs, filter) } // getStoreListFromIDs is the same function as getStoreList but only returns stores @@ -706,14 +812,14 @@ func (sp *StorePool) getStoreList(filter storeFilter) (StoreList, int, throttled func (sp *StorePool) getStoreListFromIDs( storeIDs roachpb.StoreIDSlice, filter storeFilter, ) (StoreList, int, throttledStoreReasons) { - sp.detailsMu.RLock() - defer sp.detailsMu.RUnlock() - return sp.getStoreListFromIDsRLocked(storeIDs, filter) + sp.detailsMu.Lock() + defer sp.detailsMu.Unlock() + return sp.getStoreListFromIDsLocked(storeIDs, filter) } // getStoreListFromIDsRLocked is the same function as getStoreList but requires // that the detailsMU read lock is held. -func (sp *StorePool) getStoreListFromIDsRLocked( +func (sp *StorePool) getStoreListFromIDsLocked( storeIDs roachpb.StoreIDSlice, filter storeFilter, ) (StoreList, int, throttledStoreReasons) { if sp.deterministic { @@ -728,6 +834,7 @@ func (sp *StorePool) getStoreListFromIDsRLocked( now := sp.clock.Now().GoTime() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) for _, storeID := range storeIDs { detail, ok := sp.detailsMu.storeDetails[storeID] @@ -735,7 +842,7 @@ func (sp *StorePool) getStoreListFromIDsRLocked( // Do nothing; this store is not in the StorePool. continue } - switch s := detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn); s { + switch s := detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect); s { case storeStatusThrottled: aliveStoreCount++ throttled = append(throttled, detail.throttledBecause) @@ -745,6 +852,12 @@ func (sp *StorePool) getStoreListFromIDsRLocked( case storeStatusAvailable: aliveStoreCount++ storeDescriptors = append(storeDescriptors, *detail.desc) + case storeStatusSuspect: + aliveStoreCount++ + throttled = append(throttled, "throttled because the node is considered suspect") + if filter != storeFilterThrottled && filter != storeFilterSuspect { + storeDescriptors = append(storeDescriptors, *detail.desc) + } case storeStatusDead, storeStatusUnknown, storeStatusDecommissioning: // Do nothing; this store cannot be used. default: @@ -852,23 +965,23 @@ func (sp *StorePool) getNodeLocalityString(nodeID roachpb.NodeID) string { return locality.str } -func (sp *StorePool) isNodeReadyForRoutineReplicaTransferInternal( - ctx context.Context, targetNodeID roachpb.NodeID, +func (sp *StorePool) isStoreReadyForRoutineReplicaTransferInternal( + ctx context.Context, targetStoreID roachpb.StoreID, ) bool { - timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) - // We use Now().GoTime() instead of PhysicalTime() as per the - // comment on top of IsLive(). - now := sp.clock.Now().GoTime() - - liveness := sp.nodeLivenessFn( - targetNodeID, now, timeUntilStoreDead) - res := liveness == livenesspb.NodeLivenessStatus_LIVE - if res { + status, err := sp.storeStatus(targetStoreID) + if err != nil { + return false + } + switch status { + case storeStatusThrottled, storeStatusAvailable: log.VEventf(ctx, 3, - "n%d is a live target, candidate for rebalancing", targetNodeID) - } else { + "s%d is a live target, candidate for rebalancing", targetStoreID) + return true + case storeStatusDead, storeStatusUnknown, storeStatusDecommissioning, storeStatusSuspect: log.VEventf(ctx, 3, - "not considering non-live node n%d (%s)", targetNodeID, liveness) + "not considering non-live store s%d (%v)", targetStoreID, status) + return false + default: + panic(fmt.Sprintf("unknown store status: %d", status)) } - return res } diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index bce079ceeccd..5fdb97807870 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -256,6 +256,11 @@ func TestStorePoolGetStoreList(t *testing.T) { Node: roachpb.NodeDescriptor{NodeID: 7}, Attrs: roachpb.Attributes{Attrs: required}, } + suspectedStore := roachpb.StoreDescriptor{ + StoreID: 8, + Node: roachpb.NodeDescriptor{NodeID: 8}, + Attrs: roachpb.Attributes{Attrs: required}, + } // Gossip and mark all alive initially. sg.GossipStores([]*roachpb.StoreDescriptor{ @@ -265,9 +270,10 @@ func TestStorePoolGetStoreList(t *testing.T) { &emptyStore, &deadStore, &declinedStore, + &suspectedStore, // absentStore is purposefully not gossiped. }, t) - for i := 1; i <= 7; i++ { + for i := 1; i <= 8; i++ { mnl.setNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE) } @@ -276,6 +282,9 @@ func TestStorePoolGetStoreList(t *testing.T) { sp.detailsMu.Lock() // Set declinedStore as throttled. sp.detailsMu.storeDetails[declinedStore.StoreID].throttledUntil = sp.clock.Now().GoTime().Add(time.Hour) + // Set suspectedStore as suspected. + sp.detailsMu.storeDetails[suspectedStore.StoreID].lastAvailable = sp.clock.Now().GoTime() + sp.detailsMu.storeDetails[suspectedStore.StoreID].lastUnavailable = sp.clock.Now().GoTime() sp.detailsMu.Unlock() // No filter or limited set of store IDs. @@ -288,9 +297,10 @@ func TestStorePoolGetStoreList(t *testing.T) { int(matchingStore.StoreID), int(supersetStore.StoreID), int(declinedStore.StoreID), + int(suspectedStore.StoreID), }, - /* expectedAliveStoreCount */ 5, - /* expectedThrottledStoreCount */ 1, + /* expectedAliveStoreCount */ 6, + /* expectedThrottledStoreCount */ 2, ); err != nil { t.Error(err) } @@ -305,8 +315,25 @@ func TestStorePoolGetStoreList(t *testing.T) { int(matchingStore.StoreID), int(supersetStore.StoreID), }, - /* expectedAliveStoreCount */ 5, - /* expectedThrottledStoreCount */ 1, + /* expectedAliveStoreCount */ 6, + /* expectedThrottledStoreCount */ 2, + ); err != nil { + t.Error(err) + } + + // Filter out suspected stores but don't limit the set of store IDs. + if err := verifyStoreList( + sp, + constraints, + nil, /* storeIDs */ + storeFilterSuspect, + []int{ + int(matchingStore.StoreID), + int(supersetStore.StoreID), + int(declinedStore.StoreID), + }, + /* expectedAliveStoreCount */ 6, + /* expectedThrottledStoreCount */ 2, ); err != nil { t.Error(err) } @@ -315,6 +342,7 @@ func TestStorePoolGetStoreList(t *testing.T) { matchingStore.StoreID, declinedStore.StoreID, absentStore.StoreID, + suspectedStore.StoreID, } // No filter but limited to limitToStoreIDs. @@ -327,9 +355,10 @@ func TestStorePoolGetStoreList(t *testing.T) { []int{ int(matchingStore.StoreID), int(declinedStore.StoreID), + int(suspectedStore.StoreID), }, - /* expectedAliveStoreCount */ 2, - /* expectedThrottledStoreCount */ 1, + /* expectedAliveStoreCount */ 3, + /* expectedThrottledStoreCount */ 2, ); err != nil { t.Error(err) } @@ -344,8 +373,25 @@ func TestStorePoolGetStoreList(t *testing.T) { []int{ int(matchingStore.StoreID), }, - /* expectedAliveStoreCount */ 2, - /* expectedThrottledStoreCount */ 1, + /* expectedAliveStoreCount */ 3, + /* expectedThrottledStoreCount */ 2, + ); err != nil { + t.Error(err) + } + + // Filter out suspected stores and limit to limitToStoreIDs. + // Note that suspectedStore is not included. + if err := verifyStoreList( + sp, + constraints, + limitToStoreIDs, + storeFilterSuspect, + []int{ + int(matchingStore.StoreID), + int(declinedStore.StoreID), + }, + /* expectedAliveStoreCount */ 3, + /* expectedThrottledStoreCount */ 2, ); err != nil { t.Error(err) } @@ -812,6 +858,64 @@ func TestStorePoolThrottle(t *testing.T) { } } +func TestStorePoolSuspected(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + stopper, g, _, sp, mnl := createTestStorePool( + TestTimeUntilStoreDeadOff, false, /* deterministic */ + func() int { return 10 }, /* nodeCount */ + livenesspb.NodeLivenessStatus_DEAD) + defer stopper.Stop(context.Background()) + + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(uniqueStore, t) + store := uniqueStore[0] + + now := sp.clock.Now().GoTime() + timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) + timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) + + mnl.setNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_LIVE) + sp.detailsMu.Lock() + detail := sp.getStoreDetailLocked(store.StoreID) + s := detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect) + sp.detailsMu.Unlock() + require.Equal(t, s, storeStatusAvailable) + require.False(t, detail.lastAvailable.IsZero()) + require.True(t, detail.lastUnavailable.IsZero()) + + mnl.setNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_UNAVAILABLE) + sp.detailsMu.Lock() + s = detail.status(now, timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect) + sp.detailsMu.Unlock() + require.Equal(t, s, storeStatusSuspect) + require.False(t, detail.lastAvailable.IsZero()) + require.False(t, detail.lastUnavailable.IsZero()) + + mnl.setNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_LIVE) + sp.detailsMu.Lock() + s = detail.status(now.Add(timeAfterStoreSuspect).Add(time.Millisecond), + timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect) + sp.detailsMu.Unlock() + require.Equal(t, s, storeStatusAvailable) + + mnl.setNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_DRAINING) + sp.detailsMu.Lock() + s = detail.status(now, + timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect) + sp.detailsMu.Unlock() + require.Equal(t, s, storeStatusUnknown) + require.True(t, detail.lastAvailable.IsZero()) + + mnl.setNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_LIVE) + sp.detailsMu.Lock() + s = detail.status(now.Add(timeAfterStoreSuspect).Add(time.Millisecond), + timeUntilStoreDead, sp.nodeLivenessFn, timeAfterStoreSuspect) + sp.detailsMu.Unlock() + require.Equal(t, s, storeStatusAvailable) + require.False(t, detail.lastAvailable.IsZero()) +} + func TestGetLocalities(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1133,7 +1237,7 @@ func TestNodeLivenessLivenessStatus(t *testing.T) { }, Draining: true, }, - expected: livenesspb.NodeLivenessStatus_UNAVAILABLE, + expected: livenesspb.NodeLivenessStatus_DRAINING, }, } { t.Run("", func(t *testing.T) { diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 694a757a9abc..cb4964f2ca0d 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -200,7 +200,7 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { continue } - storeList, _, _ := sr.rq.allocator.storePool.getStoreList(storeFilterNone) + storeList, _, _ := sr.rq.allocator.storePool.getStoreList(storeFilterSuspect) sr.rebalanceStore(ctx, mode, storeList) } }) @@ -948,7 +948,7 @@ func (sr *StoreRebalancer) shouldNotMoveTo( // about node liveness. targetNodeID := candidateStore.Node.NodeID if targetNodeID != sr.rq.store.Ident.NodeID { - if !sr.rq.store.cfg.StorePool.isNodeReadyForRoutineReplicaTransfer(ctx, targetNodeID) { + if !sr.rq.allocator.storePool.isStoreReadyForRoutineReplicaTransfer(ctx, candidateStore.StoreID) { log.VEventf(ctx, 3, "refusing to transfer replica to n%d/s%d", targetNodeID, candidateStore.StoreID) return true diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index d311ed40c4ec..d9c3b82f32eb 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -516,10 +516,9 @@ func TestChooseRangeToRebalance(t *testing.T) { for _, tc := range testCases { t.Run("", func(t *testing.T) { - a.storePool.isNodeReadyForRoutineReplicaTransfer = func(_ context.Context, n roachpb.NodeID) bool { + a.storePool.isStoreReadyForRoutineReplicaTransfer = func(_ context.Context, storeID roachpb.StoreID) bool { for _, s := range tc.nonLive { - // NodeID match StoreIDs here, so this comparison is valid. - if roachpb.NodeID(s) == n { + if s == storeID { return false } } diff --git a/pkg/ui/src/views/cluster/containers/nodesOverview/index.tsx b/pkg/ui/src/views/cluster/containers/nodesOverview/index.tsx index 3d55887d2344..d242dfee955e 100644 --- a/pkg/ui/src/views/cluster/containers/nodesOverview/index.tsx +++ b/pkg/ui/src/views/cluster/containers/nodesOverview/index.tsx @@ -144,6 +144,8 @@ const getBadgeTypeByNodeStatus = ( return "warning"; case LivenessStatus.NODE_STATUS_DECOMMISSIONED: return "default"; + case LivenessStatus.NODE_STATUS_DRAINING: + return "warning"; case AggregatedNodeStatus.LIVE: return "default"; case AggregatedNodeStatus.WARNING: